ioloop_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 
00004 from __future__ import absolute_import, division, print_function, with_statement
00005 import contextlib
00006 import datetime
00007 import functools
00008 import socket
00009 import sys
00010 import threading
00011 import time
00012 
00013 from tornado import gen
00014 from tornado.ioloop import IOLoop, TimeoutError
00015 from tornado.log import app_log
00016 from tornado.stack_context import ExceptionStackContext, StackContext, wrap, NullContext
00017 from tornado.testing import AsyncTestCase, bind_unused_port, ExpectLog
00018 from tornado.test.util import unittest, skipIfNonUnix, skipOnTravis
00019 
00020 try:
00021     from concurrent import futures
00022 except ImportError:
00023     futures = None
00024 
00025 
00026 class TestIOLoop(AsyncTestCase):
00027     @skipOnTravis
00028     def test_add_callback_wakeup(self):
00029         # Make sure that add_callback from inside a running IOLoop
00030         # wakes up the IOLoop immediately instead of waiting for a timeout.
00031         def callback():
00032             self.called = True
00033             self.stop()
00034 
00035         def schedule_callback():
00036             self.called = False
00037             self.io_loop.add_callback(callback)
00038             # Store away the time so we can check if we woke up immediately
00039             self.start_time = time.time()
00040         self.io_loop.add_timeout(self.io_loop.time(), schedule_callback)
00041         self.wait()
00042         self.assertAlmostEqual(time.time(), self.start_time, places=2)
00043         self.assertTrue(self.called)
00044 
00045     @skipOnTravis
00046     def test_add_callback_wakeup_other_thread(self):
00047         def target():
00048             # sleep a bit to let the ioloop go into its poll loop
00049             time.sleep(0.01)
00050             self.stop_time = time.time()
00051             self.io_loop.add_callback(self.stop)
00052         thread = threading.Thread(target=target)
00053         self.io_loop.add_callback(thread.start)
00054         self.wait()
00055         delta = time.time() - self.stop_time
00056         self.assertLess(delta, 0.1)
00057         thread.join()
00058 
00059     def test_add_timeout_timedelta(self):
00060         self.io_loop.add_timeout(datetime.timedelta(microseconds=1), self.stop)
00061         self.wait()
00062 
00063     def test_multiple_add(self):
00064         sock, port = bind_unused_port()
00065         try:
00066             self.io_loop.add_handler(sock.fileno(), lambda fd, events: None,
00067                                      IOLoop.READ)
00068             # Attempting to add the same handler twice fails
00069             # (with a platform-dependent exception)
00070             self.assertRaises(Exception, self.io_loop.add_handler,
00071                               sock.fileno(), lambda fd, events: None,
00072                               IOLoop.READ)
00073         finally:
00074             self.io_loop.remove_handler(sock.fileno())
00075             sock.close()
00076 
00077     def test_remove_without_add(self):
00078         # remove_handler should not throw an exception if called on an fd
00079         # was never added.
00080         sock, port = bind_unused_port()
00081         try:
00082             self.io_loop.remove_handler(sock.fileno())
00083         finally:
00084             sock.close()
00085 
00086     def test_add_callback_from_signal(self):
00087         # cheat a little bit and just run this normally, since we can't
00088         # easily simulate the races that happen with real signal handlers
00089         self.io_loop.add_callback_from_signal(self.stop)
00090         self.wait()
00091 
00092     def test_add_callback_from_signal_other_thread(self):
00093         # Very crude test, just to make sure that we cover this case.
00094         # This also happens to be the first test where we run an IOLoop in
00095         # a non-main thread.
00096         other_ioloop = IOLoop()
00097         thread = threading.Thread(target=other_ioloop.start)
00098         thread.start()
00099         other_ioloop.add_callback_from_signal(other_ioloop.stop)
00100         thread.join()
00101         other_ioloop.close()
00102 
00103     def test_add_callback_while_closing(self):
00104         # Issue #635: add_callback() should raise a clean exception
00105         # if called while another thread is closing the IOLoop.
00106         closing = threading.Event()
00107 
00108         def target():
00109             other_ioloop.add_callback(other_ioloop.stop)
00110             other_ioloop.start()
00111             closing.set()
00112             other_ioloop.close(all_fds=True)
00113         other_ioloop = IOLoop()
00114         thread = threading.Thread(target=target)
00115         thread.start()
00116         closing.wait()
00117         for i in range(1000):
00118             try:
00119                 other_ioloop.add_callback(lambda: None)
00120             except RuntimeError as e:
00121                 self.assertEqual("IOLoop is closing", str(e))
00122                 break
00123 
00124     def test_handle_callback_exception(self):
00125         # IOLoop.handle_callback_exception can be overridden to catch
00126         # exceptions in callbacks.
00127         def handle_callback_exception(callback):
00128             self.assertIs(sys.exc_info()[0], ZeroDivisionError)
00129             self.stop()
00130         self.io_loop.handle_callback_exception = handle_callback_exception
00131         with NullContext():
00132             # remove the test StackContext that would see this uncaught
00133             # exception as a test failure.
00134             self.io_loop.add_callback(lambda: 1 / 0)
00135         self.wait()
00136 
00137     @skipIfNonUnix  # just because socketpair is so convenient
00138     def test_read_while_writeable(self):
00139         # Ensure that write events don't come in while we're waiting for
00140         # a read and haven't asked for writeability. (the reverse is
00141         # difficult to test for)
00142         client, server = socket.socketpair()
00143         try:
00144             def handler(fd, events):
00145                 self.assertEqual(events, IOLoop.READ)
00146                 self.stop()
00147             self.io_loop.add_handler(client.fileno(), handler, IOLoop.READ)
00148             self.io_loop.add_timeout(self.io_loop.time() + 0.01,
00149                                      functools.partial(server.send, b'asdf'))
00150             self.wait()
00151             self.io_loop.remove_handler(client.fileno())
00152         finally:
00153             client.close()
00154             server.close()
00155 
00156     def test_remove_timeout_after_fire(self):
00157         # It is not an error to call remove_timeout after it has run.
00158         handle = self.io_loop.add_timeout(self.io_loop.time(), self.stop)
00159         self.wait()
00160         self.io_loop.remove_timeout(handle)
00161 
00162     def test_remove_timeout_cleanup(self):
00163         # Add and remove enough callbacks to trigger cleanup.
00164         # Not a very thorough test, but it ensures that the cleanup code
00165         # gets executed and doesn't blow up.  This test is only really useful
00166         # on PollIOLoop subclasses, but it should run silently on any
00167         # implementation.
00168         for i in range(2000):
00169             timeout = self.io_loop.add_timeout(self.io_loop.time() + 3600,
00170                                                lambda: None)
00171             self.io_loop.remove_timeout(timeout)
00172         # HACK: wait two IOLoop iterations for the GC to happen.
00173         self.io_loop.add_callback(lambda: self.io_loop.add_callback(self.stop))
00174         self.wait()
00175 
00176     def test_remove_timeout_from_timeout(self):
00177         calls = [False, False]
00178 
00179         # Schedule several callbacks and wait for them all to come due at once.
00180         # t2 should be cancelled by t1, even though it is already scheduled to
00181         # be run before the ioloop even looks at it.
00182         now = self.io_loop.time()
00183         def t1():
00184             calls[0] = True
00185             self.io_loop.remove_timeout(t2_handle)
00186         self.io_loop.add_timeout(now + 0.01, t1)
00187         def t2():
00188             calls[1] = True
00189         t2_handle = self.io_loop.add_timeout(now + 0.02, t2)
00190         self.io_loop.add_timeout(now + 0.03, self.stop)
00191         time.sleep(0.03)
00192         self.wait()
00193         self.assertEqual(calls, [True, False])
00194 
00195     def test_timeout_with_arguments(self):
00196         # This tests that all the timeout methods pass through *args correctly.
00197         results = []
00198         self.io_loop.add_timeout(self.io_loop.time(), results.append, 1)
00199         self.io_loop.add_timeout(datetime.timedelta(seconds=0),
00200                                  results.append, 2)
00201         self.io_loop.call_at(self.io_loop.time(), results.append, 3)
00202         self.io_loop.call_later(0, results.append, 4)
00203         self.io_loop.call_later(0, self.stop)
00204         self.wait()
00205         self.assertEqual(results, [1, 2, 3, 4])
00206 
00207     def test_add_timeout_return(self):
00208         # All the timeout methods return non-None handles that can be
00209         # passed to remove_timeout.
00210         handle = self.io_loop.add_timeout(self.io_loop.time(), lambda: None)
00211         self.assertFalse(handle is None)
00212         self.io_loop.remove_timeout(handle)
00213 
00214     def test_call_at_return(self):
00215         handle = self.io_loop.call_at(self.io_loop.time(), lambda: None)
00216         self.assertFalse(handle is None)
00217         self.io_loop.remove_timeout(handle)
00218 
00219     def test_call_later_return(self):
00220         handle = self.io_loop.call_later(0, lambda: None)
00221         self.assertFalse(handle is None)
00222         self.io_loop.remove_timeout(handle)
00223 
00224     def test_close_file_object(self):
00225         """When a file object is used instead of a numeric file descriptor,
00226         the object should be closed (by IOLoop.close(all_fds=True),
00227         not just the fd.
00228         """
00229         # Use a socket since they are supported by IOLoop on all platforms.
00230         # Unfortunately, sockets don't support the .closed attribute for
00231         # inspecting their close status, so we must use a wrapper.
00232         class SocketWrapper(object):
00233             def __init__(self, sockobj):
00234                 self.sockobj = sockobj
00235                 self.closed = False
00236 
00237             def fileno(self):
00238                 return self.sockobj.fileno()
00239 
00240             def close(self):
00241                 self.closed = True
00242                 self.sockobj.close()
00243         sockobj, port = bind_unused_port()
00244         socket_wrapper = SocketWrapper(sockobj)
00245         io_loop = IOLoop()
00246         io_loop.add_handler(socket_wrapper, lambda fd, events: None,
00247                             IOLoop.READ)
00248         io_loop.close(all_fds=True)
00249         self.assertTrue(socket_wrapper.closed)
00250 
00251     def test_handler_callback_file_object(self):
00252         """The handler callback receives the same fd object it passed in."""
00253         server_sock, port = bind_unused_port()
00254         fds = []
00255         def handle_connection(fd, events):
00256             fds.append(fd)
00257             conn, addr = server_sock.accept()
00258             conn.close()
00259             self.stop()
00260         self.io_loop.add_handler(server_sock, handle_connection, IOLoop.READ)
00261         with contextlib.closing(socket.socket()) as client_sock:
00262             client_sock.connect(('127.0.0.1', port))
00263             self.wait()
00264         self.io_loop.remove_handler(server_sock)
00265         self.io_loop.add_handler(server_sock.fileno(), handle_connection,
00266                                  IOLoop.READ)
00267         with contextlib.closing(socket.socket()) as client_sock:
00268             client_sock.connect(('127.0.0.1', port))
00269             self.wait()
00270         self.assertIs(fds[0], server_sock)
00271         self.assertEqual(fds[1], server_sock.fileno())
00272         self.io_loop.remove_handler(server_sock.fileno())
00273         server_sock.close()
00274 
00275     def test_mixed_fd_fileobj(self):
00276         server_sock, port = bind_unused_port()
00277         def f(fd, events):
00278             pass
00279         self.io_loop.add_handler(server_sock, f, IOLoop.READ)
00280         with self.assertRaises(Exception):
00281             # The exact error is unspecified - some implementations use
00282             # IOError, others use ValueError.
00283             self.io_loop.add_handler(server_sock.fileno(), f, IOLoop.READ)
00284         self.io_loop.remove_handler(server_sock.fileno())
00285         server_sock.close()
00286 
00287     def test_reentrant(self):
00288         """Calling start() twice should raise an error, not deadlock."""
00289         returned_from_start = [False]
00290         got_exception = [False]
00291         def callback():
00292             try:
00293                 self.io_loop.start()
00294                 returned_from_start[0] = True
00295             except Exception:
00296                 got_exception[0] = True
00297             self.stop()
00298         self.io_loop.add_callback(callback)
00299         self.wait()
00300         self.assertTrue(got_exception[0])
00301         self.assertFalse(returned_from_start[0])
00302 
00303     def test_exception_logging(self):
00304         """Uncaught exceptions get logged by the IOLoop."""
00305         # Use a NullContext to keep the exception from being caught by
00306         # AsyncTestCase.
00307         with NullContext():
00308             self.io_loop.add_callback(lambda: 1/0)
00309             self.io_loop.add_callback(self.stop)
00310             with ExpectLog(app_log, "Exception in callback"):
00311                 self.wait()
00312 
00313     def test_exception_logging_future(self):
00314         """The IOLoop examines exceptions from Futures and logs them."""
00315         with NullContext():
00316             @gen.coroutine
00317             def callback():
00318                 self.io_loop.add_callback(self.stop)
00319                 1/0
00320             self.io_loop.add_callback(callback)
00321             with ExpectLog(app_log, "Exception in callback"):
00322                 self.wait()
00323 
00324     def test_spawn_callback(self):
00325         # An added callback runs in the test's stack_context, so will be
00326         # re-arised in wait().
00327         self.io_loop.add_callback(lambda: 1/0)
00328         with self.assertRaises(ZeroDivisionError):
00329             self.wait()
00330         # A spawned callback is run directly on the IOLoop, so it will be
00331         # logged without stopping the test.
00332         self.io_loop.spawn_callback(lambda: 1/0)
00333         self.io_loop.add_callback(self.stop)
00334         with ExpectLog(app_log, "Exception in callback"):
00335             self.wait()
00336 
00337 
00338 # Deliberately not a subclass of AsyncTestCase so the IOLoop isn't
00339 # automatically set as current.
00340 class TestIOLoopCurrent(unittest.TestCase):
00341     def setUp(self):
00342         self.io_loop = IOLoop()
00343 
00344     def tearDown(self):
00345         self.io_loop.close()
00346 
00347     def test_current(self):
00348         def f():
00349             self.current_io_loop = IOLoop.current()
00350             self.io_loop.stop()
00351         self.io_loop.add_callback(f)
00352         self.io_loop.start()
00353         self.assertIs(self.current_io_loop, self.io_loop)
00354 
00355 
00356 class TestIOLoopAddCallback(AsyncTestCase):
00357     def setUp(self):
00358         super(TestIOLoopAddCallback, self).setUp()
00359         self.active_contexts = []
00360 
00361     def add_callback(self, callback, *args, **kwargs):
00362         self.io_loop.add_callback(callback, *args, **kwargs)
00363 
00364     @contextlib.contextmanager
00365     def context(self, name):
00366         self.active_contexts.append(name)
00367         yield
00368         self.assertEqual(self.active_contexts.pop(), name)
00369 
00370     def test_pre_wrap(self):
00371         # A pre-wrapped callback is run in the context in which it was
00372         # wrapped, not when it was added to the IOLoop.
00373         def f1():
00374             self.assertIn('c1', self.active_contexts)
00375             self.assertNotIn('c2', self.active_contexts)
00376             self.stop()
00377 
00378         with StackContext(functools.partial(self.context, 'c1')):
00379             wrapped = wrap(f1)
00380 
00381         with StackContext(functools.partial(self.context, 'c2')):
00382             self.add_callback(wrapped)
00383 
00384         self.wait()
00385 
00386     def test_pre_wrap_with_args(self):
00387         # Same as test_pre_wrap, but the function takes arguments.
00388         # Implementation note: The function must not be wrapped in a
00389         # functools.partial until after it has been passed through
00390         # stack_context.wrap
00391         def f1(foo, bar):
00392             self.assertIn('c1', self.active_contexts)
00393             self.assertNotIn('c2', self.active_contexts)
00394             self.stop((foo, bar))
00395 
00396         with StackContext(functools.partial(self.context, 'c1')):
00397             wrapped = wrap(f1)
00398 
00399         with StackContext(functools.partial(self.context, 'c2')):
00400             self.add_callback(wrapped, 1, bar=2)
00401 
00402         result = self.wait()
00403         self.assertEqual(result, (1, 2))
00404 
00405 
00406 class TestIOLoopAddCallbackFromSignal(TestIOLoopAddCallback):
00407     # Repeat the add_callback tests using add_callback_from_signal
00408     def add_callback(self, callback, *args, **kwargs):
00409         self.io_loop.add_callback_from_signal(callback, *args, **kwargs)
00410 
00411 
00412 @unittest.skipIf(futures is None, "futures module not present")
00413 class TestIOLoopFutures(AsyncTestCase):
00414     def test_add_future_threads(self):
00415         with futures.ThreadPoolExecutor(1) as pool:
00416             self.io_loop.add_future(pool.submit(lambda: None),
00417                                     lambda future: self.stop(future))
00418             future = self.wait()
00419             self.assertTrue(future.done())
00420             self.assertTrue(future.result() is None)
00421 
00422     def test_add_future_stack_context(self):
00423         ready = threading.Event()
00424 
00425         def task():
00426             # we must wait for the ioloop callback to be scheduled before
00427             # the task completes to ensure that add_future adds the callback
00428             # asynchronously (which is the scenario in which capturing
00429             # the stack_context matters)
00430             ready.wait(1)
00431             assert ready.isSet(), "timed out"
00432             raise Exception("worker")
00433 
00434         def callback(future):
00435             self.future = future
00436             raise Exception("callback")
00437 
00438         def handle_exception(typ, value, traceback):
00439             self.exception = value
00440             self.stop()
00441             return True
00442 
00443         # stack_context propagates to the ioloop callback, but the worker
00444         # task just has its exceptions caught and saved in the Future.
00445         with futures.ThreadPoolExecutor(1) as pool:
00446             with ExceptionStackContext(handle_exception):
00447                 self.io_loop.add_future(pool.submit(task), callback)
00448             ready.set()
00449         self.wait()
00450 
00451         self.assertEqual(self.exception.args[0], "callback")
00452         self.assertEqual(self.future.exception().args[0], "worker")
00453 
00454 
00455 class TestIOLoopRunSync(unittest.TestCase):
00456     def setUp(self):
00457         self.io_loop = IOLoop()
00458 
00459     def tearDown(self):
00460         self.io_loop.close()
00461 
00462     def test_sync_result(self):
00463         self.assertEqual(self.io_loop.run_sync(lambda: 42), 42)
00464 
00465     def test_sync_exception(self):
00466         with self.assertRaises(ZeroDivisionError):
00467             self.io_loop.run_sync(lambda: 1 / 0)
00468 
00469     def test_async_result(self):
00470         @gen.coroutine
00471         def f():
00472             yield gen.Task(self.io_loop.add_callback)
00473             raise gen.Return(42)
00474         self.assertEqual(self.io_loop.run_sync(f), 42)
00475 
00476     def test_async_exception(self):
00477         @gen.coroutine
00478         def f():
00479             yield gen.Task(self.io_loop.add_callback)
00480             1 / 0
00481         with self.assertRaises(ZeroDivisionError):
00482             self.io_loop.run_sync(f)
00483 
00484     def test_current(self):
00485         def f():
00486             self.assertIs(IOLoop.current(), self.io_loop)
00487         self.io_loop.run_sync(f)
00488 
00489     def test_timeout(self):
00490         @gen.coroutine
00491         def f():
00492             yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)
00493         self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=0.01)
00494 
00495 
00496 if __name__ == "__main__":
00497     unittest.main()


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Thu Aug 27 2015 14:50:39