00001
00002
00003
00004 from __future__ import absolute_import, division, print_function, with_statement
00005 import contextlib
00006 import datetime
00007 import functools
00008 import socket
00009 import sys
00010 import threading
00011 import time
00012
00013 from tornado import gen
00014 from tornado.ioloop import IOLoop, TimeoutError
00015 from tornado.log import app_log
00016 from tornado.stack_context import ExceptionStackContext, StackContext, wrap, NullContext
00017 from tornado.testing import AsyncTestCase, bind_unused_port, ExpectLog
00018 from tornado.test.util import unittest, skipIfNonUnix, skipOnTravis
00019
00020 try:
00021 from concurrent import futures
00022 except ImportError:
00023 futures = None
00024
00025
00026 class TestIOLoop(AsyncTestCase):
00027 @skipOnTravis
00028 def test_add_callback_wakeup(self):
00029
00030
00031 def callback():
00032 self.called = True
00033 self.stop()
00034
00035 def schedule_callback():
00036 self.called = False
00037 self.io_loop.add_callback(callback)
00038
00039 self.start_time = time.time()
00040 self.io_loop.add_timeout(self.io_loop.time(), schedule_callback)
00041 self.wait()
00042 self.assertAlmostEqual(time.time(), self.start_time, places=2)
00043 self.assertTrue(self.called)
00044
00045 @skipOnTravis
00046 def test_add_callback_wakeup_other_thread(self):
00047 def target():
00048
00049 time.sleep(0.01)
00050 self.stop_time = time.time()
00051 self.io_loop.add_callback(self.stop)
00052 thread = threading.Thread(target=target)
00053 self.io_loop.add_callback(thread.start)
00054 self.wait()
00055 delta = time.time() - self.stop_time
00056 self.assertLess(delta, 0.1)
00057 thread.join()
00058
00059 def test_add_timeout_timedelta(self):
00060 self.io_loop.add_timeout(datetime.timedelta(microseconds=1), self.stop)
00061 self.wait()
00062
00063 def test_multiple_add(self):
00064 sock, port = bind_unused_port()
00065 try:
00066 self.io_loop.add_handler(sock.fileno(), lambda fd, events: None,
00067 IOLoop.READ)
00068
00069
00070 self.assertRaises(Exception, self.io_loop.add_handler,
00071 sock.fileno(), lambda fd, events: None,
00072 IOLoop.READ)
00073 finally:
00074 self.io_loop.remove_handler(sock.fileno())
00075 sock.close()
00076
00077 def test_remove_without_add(self):
00078
00079
00080 sock, port = bind_unused_port()
00081 try:
00082 self.io_loop.remove_handler(sock.fileno())
00083 finally:
00084 sock.close()
00085
00086 def test_add_callback_from_signal(self):
00087
00088
00089 self.io_loop.add_callback_from_signal(self.stop)
00090 self.wait()
00091
00092 def test_add_callback_from_signal_other_thread(self):
00093
00094
00095
00096 other_ioloop = IOLoop()
00097 thread = threading.Thread(target=other_ioloop.start)
00098 thread.start()
00099 other_ioloop.add_callback_from_signal(other_ioloop.stop)
00100 thread.join()
00101 other_ioloop.close()
00102
00103 def test_add_callback_while_closing(self):
00104
00105
00106 closing = threading.Event()
00107
00108 def target():
00109 other_ioloop.add_callback(other_ioloop.stop)
00110 other_ioloop.start()
00111 closing.set()
00112 other_ioloop.close(all_fds=True)
00113 other_ioloop = IOLoop()
00114 thread = threading.Thread(target=target)
00115 thread.start()
00116 closing.wait()
00117 for i in range(1000):
00118 try:
00119 other_ioloop.add_callback(lambda: None)
00120 except RuntimeError as e:
00121 self.assertEqual("IOLoop is closing", str(e))
00122 break
00123
00124 def test_handle_callback_exception(self):
00125
00126
00127 def handle_callback_exception(callback):
00128 self.assertIs(sys.exc_info()[0], ZeroDivisionError)
00129 self.stop()
00130 self.io_loop.handle_callback_exception = handle_callback_exception
00131 with NullContext():
00132
00133
00134 self.io_loop.add_callback(lambda: 1 / 0)
00135 self.wait()
00136
00137 @skipIfNonUnix
00138 def test_read_while_writeable(self):
00139
00140
00141
00142 client, server = socket.socketpair()
00143 try:
00144 def handler(fd, events):
00145 self.assertEqual(events, IOLoop.READ)
00146 self.stop()
00147 self.io_loop.add_handler(client.fileno(), handler, IOLoop.READ)
00148 self.io_loop.add_timeout(self.io_loop.time() + 0.01,
00149 functools.partial(server.send, b'asdf'))
00150 self.wait()
00151 self.io_loop.remove_handler(client.fileno())
00152 finally:
00153 client.close()
00154 server.close()
00155
00156 def test_remove_timeout_after_fire(self):
00157
00158 handle = self.io_loop.add_timeout(self.io_loop.time(), self.stop)
00159 self.wait()
00160 self.io_loop.remove_timeout(handle)
00161
00162 def test_remove_timeout_cleanup(self):
00163
00164
00165
00166
00167
00168 for i in range(2000):
00169 timeout = self.io_loop.add_timeout(self.io_loop.time() + 3600,
00170 lambda: None)
00171 self.io_loop.remove_timeout(timeout)
00172
00173 self.io_loop.add_callback(lambda: self.io_loop.add_callback(self.stop))
00174 self.wait()
00175
00176 def test_remove_timeout_from_timeout(self):
00177 calls = [False, False]
00178
00179
00180
00181
00182 now = self.io_loop.time()
00183 def t1():
00184 calls[0] = True
00185 self.io_loop.remove_timeout(t2_handle)
00186 self.io_loop.add_timeout(now + 0.01, t1)
00187 def t2():
00188 calls[1] = True
00189 t2_handle = self.io_loop.add_timeout(now + 0.02, t2)
00190 self.io_loop.add_timeout(now + 0.03, self.stop)
00191 time.sleep(0.03)
00192 self.wait()
00193 self.assertEqual(calls, [True, False])
00194
00195 def test_timeout_with_arguments(self):
00196
00197 results = []
00198 self.io_loop.add_timeout(self.io_loop.time(), results.append, 1)
00199 self.io_loop.add_timeout(datetime.timedelta(seconds=0),
00200 results.append, 2)
00201 self.io_loop.call_at(self.io_loop.time(), results.append, 3)
00202 self.io_loop.call_later(0, results.append, 4)
00203 self.io_loop.call_later(0, self.stop)
00204 self.wait()
00205 self.assertEqual(results, [1, 2, 3, 4])
00206
00207 def test_add_timeout_return(self):
00208
00209
00210 handle = self.io_loop.add_timeout(self.io_loop.time(), lambda: None)
00211 self.assertFalse(handle is None)
00212 self.io_loop.remove_timeout(handle)
00213
00214 def test_call_at_return(self):
00215 handle = self.io_loop.call_at(self.io_loop.time(), lambda: None)
00216 self.assertFalse(handle is None)
00217 self.io_loop.remove_timeout(handle)
00218
00219 def test_call_later_return(self):
00220 handle = self.io_loop.call_later(0, lambda: None)
00221 self.assertFalse(handle is None)
00222 self.io_loop.remove_timeout(handle)
00223
00224 def test_close_file_object(self):
00225 """When a file object is used instead of a numeric file descriptor,
00226 the object should be closed (by IOLoop.close(all_fds=True),
00227 not just the fd.
00228 """
00229
00230
00231
00232 class SocketWrapper(object):
00233 def __init__(self, sockobj):
00234 self.sockobj = sockobj
00235 self.closed = False
00236
00237 def fileno(self):
00238 return self.sockobj.fileno()
00239
00240 def close(self):
00241 self.closed = True
00242 self.sockobj.close()
00243 sockobj, port = bind_unused_port()
00244 socket_wrapper = SocketWrapper(sockobj)
00245 io_loop = IOLoop()
00246 io_loop.add_handler(socket_wrapper, lambda fd, events: None,
00247 IOLoop.READ)
00248 io_loop.close(all_fds=True)
00249 self.assertTrue(socket_wrapper.closed)
00250
00251 def test_handler_callback_file_object(self):
00252 """The handler callback receives the same fd object it passed in."""
00253 server_sock, port = bind_unused_port()
00254 fds = []
00255 def handle_connection(fd, events):
00256 fds.append(fd)
00257 conn, addr = server_sock.accept()
00258 conn.close()
00259 self.stop()
00260 self.io_loop.add_handler(server_sock, handle_connection, IOLoop.READ)
00261 with contextlib.closing(socket.socket()) as client_sock:
00262 client_sock.connect(('127.0.0.1', port))
00263 self.wait()
00264 self.io_loop.remove_handler(server_sock)
00265 self.io_loop.add_handler(server_sock.fileno(), handle_connection,
00266 IOLoop.READ)
00267 with contextlib.closing(socket.socket()) as client_sock:
00268 client_sock.connect(('127.0.0.1', port))
00269 self.wait()
00270 self.assertIs(fds[0], server_sock)
00271 self.assertEqual(fds[1], server_sock.fileno())
00272 self.io_loop.remove_handler(server_sock.fileno())
00273 server_sock.close()
00274
00275 def test_mixed_fd_fileobj(self):
00276 server_sock, port = bind_unused_port()
00277 def f(fd, events):
00278 pass
00279 self.io_loop.add_handler(server_sock, f, IOLoop.READ)
00280 with self.assertRaises(Exception):
00281
00282
00283 self.io_loop.add_handler(server_sock.fileno(), f, IOLoop.READ)
00284 self.io_loop.remove_handler(server_sock.fileno())
00285 server_sock.close()
00286
00287 def test_reentrant(self):
00288 """Calling start() twice should raise an error, not deadlock."""
00289 returned_from_start = [False]
00290 got_exception = [False]
00291 def callback():
00292 try:
00293 self.io_loop.start()
00294 returned_from_start[0] = True
00295 except Exception:
00296 got_exception[0] = True
00297 self.stop()
00298 self.io_loop.add_callback(callback)
00299 self.wait()
00300 self.assertTrue(got_exception[0])
00301 self.assertFalse(returned_from_start[0])
00302
00303 def test_exception_logging(self):
00304 """Uncaught exceptions get logged by the IOLoop."""
00305
00306
00307 with NullContext():
00308 self.io_loop.add_callback(lambda: 1/0)
00309 self.io_loop.add_callback(self.stop)
00310 with ExpectLog(app_log, "Exception in callback"):
00311 self.wait()
00312
00313 def test_exception_logging_future(self):
00314 """The IOLoop examines exceptions from Futures and logs them."""
00315 with NullContext():
00316 @gen.coroutine
00317 def callback():
00318 self.io_loop.add_callback(self.stop)
00319 1/0
00320 self.io_loop.add_callback(callback)
00321 with ExpectLog(app_log, "Exception in callback"):
00322 self.wait()
00323
00324 def test_spawn_callback(self):
00325
00326
00327 self.io_loop.add_callback(lambda: 1/0)
00328 with self.assertRaises(ZeroDivisionError):
00329 self.wait()
00330
00331
00332 self.io_loop.spawn_callback(lambda: 1/0)
00333 self.io_loop.add_callback(self.stop)
00334 with ExpectLog(app_log, "Exception in callback"):
00335 self.wait()
00336
00337
00338
00339
00340 class TestIOLoopCurrent(unittest.TestCase):
00341 def setUp(self):
00342 self.io_loop = IOLoop()
00343
00344 def tearDown(self):
00345 self.io_loop.close()
00346
00347 def test_current(self):
00348 def f():
00349 self.current_io_loop = IOLoop.current()
00350 self.io_loop.stop()
00351 self.io_loop.add_callback(f)
00352 self.io_loop.start()
00353 self.assertIs(self.current_io_loop, self.io_loop)
00354
00355
00356 class TestIOLoopAddCallback(AsyncTestCase):
00357 def setUp(self):
00358 super(TestIOLoopAddCallback, self).setUp()
00359 self.active_contexts = []
00360
00361 def add_callback(self, callback, *args, **kwargs):
00362 self.io_loop.add_callback(callback, *args, **kwargs)
00363
00364 @contextlib.contextmanager
00365 def context(self, name):
00366 self.active_contexts.append(name)
00367 yield
00368 self.assertEqual(self.active_contexts.pop(), name)
00369
00370 def test_pre_wrap(self):
00371
00372
00373 def f1():
00374 self.assertIn('c1', self.active_contexts)
00375 self.assertNotIn('c2', self.active_contexts)
00376 self.stop()
00377
00378 with StackContext(functools.partial(self.context, 'c1')):
00379 wrapped = wrap(f1)
00380
00381 with StackContext(functools.partial(self.context, 'c2')):
00382 self.add_callback(wrapped)
00383
00384 self.wait()
00385
00386 def test_pre_wrap_with_args(self):
00387
00388
00389
00390
00391 def f1(foo, bar):
00392 self.assertIn('c1', self.active_contexts)
00393 self.assertNotIn('c2', self.active_contexts)
00394 self.stop((foo, bar))
00395
00396 with StackContext(functools.partial(self.context, 'c1')):
00397 wrapped = wrap(f1)
00398
00399 with StackContext(functools.partial(self.context, 'c2')):
00400 self.add_callback(wrapped, 1, bar=2)
00401
00402 result = self.wait()
00403 self.assertEqual(result, (1, 2))
00404
00405
00406 class TestIOLoopAddCallbackFromSignal(TestIOLoopAddCallback):
00407
00408 def add_callback(self, callback, *args, **kwargs):
00409 self.io_loop.add_callback_from_signal(callback, *args, **kwargs)
00410
00411
00412 @unittest.skipIf(futures is None, "futures module not present")
00413 class TestIOLoopFutures(AsyncTestCase):
00414 def test_add_future_threads(self):
00415 with futures.ThreadPoolExecutor(1) as pool:
00416 self.io_loop.add_future(pool.submit(lambda: None),
00417 lambda future: self.stop(future))
00418 future = self.wait()
00419 self.assertTrue(future.done())
00420 self.assertTrue(future.result() is None)
00421
00422 def test_add_future_stack_context(self):
00423 ready = threading.Event()
00424
00425 def task():
00426
00427
00428
00429
00430 ready.wait(1)
00431 assert ready.isSet(), "timed out"
00432 raise Exception("worker")
00433
00434 def callback(future):
00435 self.future = future
00436 raise Exception("callback")
00437
00438 def handle_exception(typ, value, traceback):
00439 self.exception = value
00440 self.stop()
00441 return True
00442
00443
00444
00445 with futures.ThreadPoolExecutor(1) as pool:
00446 with ExceptionStackContext(handle_exception):
00447 self.io_loop.add_future(pool.submit(task), callback)
00448 ready.set()
00449 self.wait()
00450
00451 self.assertEqual(self.exception.args[0], "callback")
00452 self.assertEqual(self.future.exception().args[0], "worker")
00453
00454
00455 class TestIOLoopRunSync(unittest.TestCase):
00456 def setUp(self):
00457 self.io_loop = IOLoop()
00458
00459 def tearDown(self):
00460 self.io_loop.close()
00461
00462 def test_sync_result(self):
00463 self.assertEqual(self.io_loop.run_sync(lambda: 42), 42)
00464
00465 def test_sync_exception(self):
00466 with self.assertRaises(ZeroDivisionError):
00467 self.io_loop.run_sync(lambda: 1 / 0)
00468
00469 def test_async_result(self):
00470 @gen.coroutine
00471 def f():
00472 yield gen.Task(self.io_loop.add_callback)
00473 raise gen.Return(42)
00474 self.assertEqual(self.io_loop.run_sync(f), 42)
00475
00476 def test_async_exception(self):
00477 @gen.coroutine
00478 def f():
00479 yield gen.Task(self.io_loop.add_callback)
00480 1 / 0
00481 with self.assertRaises(ZeroDivisionError):
00482 self.io_loop.run_sync(f)
00483
00484 def test_current(self):
00485 def f():
00486 self.assertIs(IOLoop.current(), self.io_loop)
00487 self.io_loop.run_sync(f)
00488
00489 def test_timeout(self):
00490 @gen.coroutine
00491 def f():
00492 yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)
00493 self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=0.01)
00494
00495
00496 if __name__ == "__main__":
00497 unittest.main()