concurrent_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2012 Facebook
00004 #
00005 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00006 # not use this file except in compliance with the License. You may obtain
00007 # a copy of the License at
00008 #
00009 #     http://www.apache.org/licenses/LICENSE-2.0
00010 #
00011 # Unless required by applicable law or agreed to in writing, software
00012 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00013 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00014 # License for the specific language governing permissions and limitations
00015 # under the License.
00016 from __future__ import absolute_import, division, print_function, with_statement
00017 
00018 import logging
00019 import re
00020 import socket
00021 import sys
00022 import traceback
00023 
00024 from tornado.concurrent import Future, return_future, ReturnValueIgnoredError
00025 from tornado.escape import utf8, to_unicode
00026 from tornado import gen
00027 from tornado.iostream import IOStream
00028 from tornado import stack_context
00029 from tornado.tcpserver import TCPServer
00030 from tornado.testing import AsyncTestCase, LogTrapTestCase, bind_unused_port, gen_test
00031 
00032 
00033 try:
00034     from concurrent import futures
00035 except ImportError:
00036     futures = None
00037 
00038 
00039 class ReturnFutureTest(AsyncTestCase):
00040     @return_future
00041     def sync_future(self, callback):
00042         callback(42)
00043 
00044     @return_future
00045     def async_future(self, callback):
00046         self.io_loop.add_callback(callback, 42)
00047 
00048     @return_future
00049     def immediate_failure(self, callback):
00050         1 / 0
00051 
00052     @return_future
00053     def delayed_failure(self, callback):
00054         self.io_loop.add_callback(lambda: 1 / 0)
00055 
00056     @return_future
00057     def return_value(self, callback):
00058         # Note that the result of both running the callback and returning
00059         # a value (or raising an exception) is unspecified; with current
00060         # implementations the last event prior to callback resolution wins.
00061         return 42
00062 
00063     @return_future
00064     def no_result_future(self, callback):
00065         callback()
00066 
00067     def test_immediate_failure(self):
00068         with self.assertRaises(ZeroDivisionError):
00069             # The caller sees the error just like a normal function.
00070             self.immediate_failure(callback=self.stop)
00071         # The callback is not run because the function failed synchronously.
00072         self.io_loop.add_timeout(self.io_loop.time() + 0.05, self.stop)
00073         result = self.wait()
00074         self.assertIs(result, None)
00075 
00076     def test_return_value(self):
00077         with self.assertRaises(ReturnValueIgnoredError):
00078             self.return_value(callback=self.stop)
00079 
00080     def test_callback_kw(self):
00081         future = self.sync_future(callback=self.stop)
00082         result = self.wait()
00083         self.assertEqual(result, 42)
00084         self.assertEqual(future.result(), 42)
00085 
00086     def test_callback_positional(self):
00087         # When the callback is passed in positionally, future_wrap shouldn't
00088         # add another callback in the kwargs.
00089         future = self.sync_future(self.stop)
00090         result = self.wait()
00091         self.assertEqual(result, 42)
00092         self.assertEqual(future.result(), 42)
00093 
00094     def test_no_callback(self):
00095         future = self.sync_future()
00096         self.assertEqual(future.result(), 42)
00097 
00098     def test_none_callback_kw(self):
00099         # explicitly pass None as callback
00100         future = self.sync_future(callback=None)
00101         self.assertEqual(future.result(), 42)
00102 
00103     def test_none_callback_pos(self):
00104         future = self.sync_future(None)
00105         self.assertEqual(future.result(), 42)
00106 
00107     def test_async_future(self):
00108         future = self.async_future()
00109         self.assertFalse(future.done())
00110         self.io_loop.add_future(future, self.stop)
00111         future2 = self.wait()
00112         self.assertIs(future, future2)
00113         self.assertEqual(future.result(), 42)
00114 
00115     @gen_test
00116     def test_async_future_gen(self):
00117         result = yield self.async_future()
00118         self.assertEqual(result, 42)
00119 
00120     def test_delayed_failure(self):
00121         future = self.delayed_failure()
00122         self.io_loop.add_future(future, self.stop)
00123         future2 = self.wait()
00124         self.assertIs(future, future2)
00125         with self.assertRaises(ZeroDivisionError):
00126             future.result()
00127 
00128     def test_kw_only_callback(self):
00129         @return_future
00130         def f(**kwargs):
00131             kwargs['callback'](42)
00132         future = f()
00133         self.assertEqual(future.result(), 42)
00134 
00135     def test_error_in_callback(self):
00136         self.sync_future(callback=lambda future: 1 / 0)
00137         # The exception gets caught by our StackContext and will be re-raised
00138         # when we wait.
00139         self.assertRaises(ZeroDivisionError, self.wait)
00140 
00141     def test_no_result_future(self):
00142         future = self.no_result_future(self.stop)
00143         result = self.wait()
00144         self.assertIs(result, None)
00145         # result of this future is undefined, but not an error
00146         future.result()
00147 
00148     def test_no_result_future_callback(self):
00149         future = self.no_result_future(callback=lambda: self.stop())
00150         result = self.wait()
00151         self.assertIs(result, None)
00152         future.result()
00153 
00154     @gen_test
00155     def test_future_traceback(self):
00156         @return_future
00157         @gen.engine
00158         def f(callback):
00159             yield gen.Task(self.io_loop.add_callback)
00160             try:
00161                 1 / 0
00162             except ZeroDivisionError:
00163                 self.expected_frame = traceback.extract_tb(
00164                     sys.exc_info()[2], limit=1)[0]
00165                 raise
00166         try:
00167             yield f()
00168             self.fail("didn't get expected exception")
00169         except ZeroDivisionError:
00170             tb = traceback.extract_tb(sys.exc_info()[2])
00171             self.assertIn(self.expected_frame, tb)
00172 
00173 # The following series of classes demonstrate and test various styles
00174 # of use, with and without generators and futures.
00175 
00176 
00177 class CapServer(TCPServer):
00178     def handle_stream(self, stream, address):
00179         logging.info("handle_stream")
00180         self.stream = stream
00181         self.stream.read_until(b"\n", self.handle_read)
00182 
00183     def handle_read(self, data):
00184         logging.info("handle_read")
00185         data = to_unicode(data)
00186         if data == data.upper():
00187             self.stream.write(b"error\talready capitalized\n")
00188         else:
00189             # data already has \n
00190             self.stream.write(utf8("ok\t%s" % data.upper()))
00191         self.stream.close()
00192 
00193 
00194 class CapError(Exception):
00195     pass
00196 
00197 
00198 class BaseCapClient(object):
00199     def __init__(self, port, io_loop):
00200         self.port = port
00201         self.io_loop = io_loop
00202 
00203     def process_response(self, data):
00204         status, message = re.match('(.*)\t(.*)\n', to_unicode(data)).groups()
00205         if status == 'ok':
00206             return message
00207         else:
00208             raise CapError(message)
00209 
00210 
00211 class ManualCapClient(BaseCapClient):
00212     def capitalize(self, request_data, callback=None):
00213         logging.info("capitalize")
00214         self.request_data = request_data
00215         self.stream = IOStream(socket.socket(), io_loop=self.io_loop)
00216         self.stream.connect(('127.0.0.1', self.port),
00217                             callback=self.handle_connect)
00218         self.future = Future()
00219         if callback is not None:
00220             self.future.add_done_callback(
00221                 stack_context.wrap(lambda future: callback(future.result())))
00222         return self.future
00223 
00224     def handle_connect(self):
00225         logging.info("handle_connect")
00226         self.stream.write(utf8(self.request_data + "\n"))
00227         self.stream.read_until(b'\n', callback=self.handle_read)
00228 
00229     def handle_read(self, data):
00230         logging.info("handle_read")
00231         self.stream.close()
00232         try:
00233             self.future.set_result(self.process_response(data))
00234         except CapError as e:
00235             self.future.set_exception(e)
00236 
00237 
00238 class DecoratorCapClient(BaseCapClient):
00239     @return_future
00240     def capitalize(self, request_data, callback):
00241         logging.info("capitalize")
00242         self.request_data = request_data
00243         self.stream = IOStream(socket.socket(), io_loop=self.io_loop)
00244         self.stream.connect(('127.0.0.1', self.port),
00245                             callback=self.handle_connect)
00246         self.callback = callback
00247 
00248     def handle_connect(self):
00249         logging.info("handle_connect")
00250         self.stream.write(utf8(self.request_data + "\n"))
00251         self.stream.read_until(b'\n', callback=self.handle_read)
00252 
00253     def handle_read(self, data):
00254         logging.info("handle_read")
00255         self.stream.close()
00256         self.callback(self.process_response(data))
00257 
00258 
00259 class GeneratorCapClient(BaseCapClient):
00260     @return_future
00261     @gen.engine
00262     def capitalize(self, request_data, callback):
00263         logging.info('capitalize')
00264         stream = IOStream(socket.socket(), io_loop=self.io_loop)
00265         logging.info('connecting')
00266         yield gen.Task(stream.connect, ('127.0.0.1', self.port))
00267         stream.write(utf8(request_data + '\n'))
00268         logging.info('reading')
00269         data = yield gen.Task(stream.read_until, b'\n')
00270         logging.info('returning')
00271         stream.close()
00272         callback(self.process_response(data))
00273 
00274 
00275 class ClientTestMixin(object):
00276     def setUp(self):
00277         super(ClientTestMixin, self).setUp()
00278         self.server = CapServer(io_loop=self.io_loop)
00279         sock, port = bind_unused_port()
00280         self.server.add_sockets([sock])
00281         self.client = self.client_class(io_loop=self.io_loop, port=port)
00282 
00283     def tearDown(self):
00284         self.server.stop()
00285         super(ClientTestMixin, self).tearDown()
00286 
00287     def test_callback(self):
00288         self.client.capitalize("hello", callback=self.stop)
00289         result = self.wait()
00290         self.assertEqual(result, "HELLO")
00291 
00292     def test_callback_error(self):
00293         self.client.capitalize("HELLO", callback=self.stop)
00294         self.assertRaisesRegexp(CapError, "already capitalized", self.wait)
00295 
00296     def test_future(self):
00297         future = self.client.capitalize("hello")
00298         self.io_loop.add_future(future, self.stop)
00299         self.wait()
00300         self.assertEqual(future.result(), "HELLO")
00301 
00302     def test_future_error(self):
00303         future = self.client.capitalize("HELLO")
00304         self.io_loop.add_future(future, self.stop)
00305         self.wait()
00306         self.assertRaisesRegexp(CapError, "already capitalized", future.result)
00307 
00308     def test_generator(self):
00309         @gen.engine
00310         def f():
00311             result = yield self.client.capitalize("hello")
00312             self.assertEqual(result, "HELLO")
00313             self.stop()
00314         f()
00315         self.wait()
00316 
00317     def test_generator_error(self):
00318         @gen.engine
00319         def f():
00320             with self.assertRaisesRegexp(CapError, "already capitalized"):
00321                 yield self.client.capitalize("HELLO")
00322             self.stop()
00323         f()
00324         self.wait()
00325 
00326 
00327 class ManualClientTest(ClientTestMixin, AsyncTestCase, LogTrapTestCase):
00328     client_class = ManualCapClient
00329 
00330 
00331 class DecoratorClientTest(ClientTestMixin, AsyncTestCase, LogTrapTestCase):
00332     client_class = DecoratorCapClient
00333 
00334 
00335 class GeneratorClientTest(ClientTestMixin, AsyncTestCase, LogTrapTestCase):
00336     client_class = GeneratorCapClient


rosbridge_tools
Author(s): Jonathan Mace
autogenerated on Sat Dec 27 2014 11:25:59