00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 """
00017 Unittest for the twisted-style reactor.
00018 """
00019
00020 from __future__ import absolute_import, division, print_function, with_statement
00021
00022 import os
00023 import shutil
00024 import signal
00025 import tempfile
00026 import threading
00027
00028 try:
00029 import fcntl
00030 from twisted.internet.defer import Deferred
00031 from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor
00032 from twisted.internet.protocol import Protocol
00033 from twisted.python import log
00034 from tornado.platform.twisted import TornadoReactor, TwistedIOLoop
00035 from zope.interface import implementer
00036 have_twisted = True
00037 except ImportError:
00038 have_twisted = False
00039
00040
00041
00042 try:
00043 from twisted.web.client import Agent
00044 from twisted.web.resource import Resource
00045 from twisted.web.server import Site
00046 have_twisted_web = True
00047 except ImportError:
00048 have_twisted_web = False
00049
00050 try:
00051 import thread
00052 except ImportError:
00053 import _thread as thread
00054
00055 from tornado.httpclient import AsyncHTTPClient
00056 from tornado.httpserver import HTTPServer
00057 from tornado.ioloop import IOLoop
00058 from tornado.platform.auto import set_close_exec
00059 from tornado.platform.select import SelectIOLoop
00060 from tornado.testing import bind_unused_port
00061 from tornado.test.util import unittest
00062 from tornado.util import import_object
00063 from tornado.web import RequestHandler, Application
00064
00065 skipIfNoTwisted = unittest.skipUnless(have_twisted,
00066 "twisted module not present")
00067
00068
00069 def save_signal_handlers():
00070 saved = {}
00071 for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGCHLD]:
00072 saved[sig] = signal.getsignal(sig)
00073 if "twisted" in repr(saved):
00074 if not issubclass(IOLoop.configured_class(), TwistedIOLoop):
00075
00076
00077
00078 raise Exception("twisted signal handlers already installed")
00079 return saved
00080
00081
00082 def restore_signal_handlers(saved):
00083 for sig, handler in saved.items():
00084 signal.signal(sig, handler)
00085
00086
00087 class ReactorTestCase(unittest.TestCase):
00088 def setUp(self):
00089 self._saved_signals = save_signal_handlers()
00090 self._io_loop = IOLoop()
00091 self._reactor = TornadoReactor(self._io_loop)
00092
00093 def tearDown(self):
00094 self._io_loop.close(all_fds=True)
00095 restore_signal_handlers(self._saved_signals)
00096
00097
00098 @skipIfNoTwisted
00099 class ReactorWhenRunningTest(ReactorTestCase):
00100 def test_whenRunning(self):
00101 self._whenRunningCalled = False
00102 self._anotherWhenRunningCalled = False
00103 self._reactor.callWhenRunning(self.whenRunningCallback)
00104 self._reactor.run()
00105 self.assertTrue(self._whenRunningCalled)
00106 self.assertTrue(self._anotherWhenRunningCalled)
00107
00108 def whenRunningCallback(self):
00109 self._whenRunningCalled = True
00110 self._reactor.callWhenRunning(self.anotherWhenRunningCallback)
00111 self._reactor.stop()
00112
00113 def anotherWhenRunningCallback(self):
00114 self._anotherWhenRunningCalled = True
00115
00116
00117 @skipIfNoTwisted
00118 class ReactorCallLaterTest(ReactorTestCase):
00119 def test_callLater(self):
00120 self._laterCalled = False
00121 self._now = self._reactor.seconds()
00122 self._timeout = 0.001
00123 dc = self._reactor.callLater(self._timeout, self.callLaterCallback)
00124 self.assertEqual(self._reactor.getDelayedCalls(), [dc])
00125 self._reactor.run()
00126 self.assertTrue(self._laterCalled)
00127 self.assertTrue(self._called - self._now > self._timeout)
00128 self.assertEqual(self._reactor.getDelayedCalls(), [])
00129
00130 def callLaterCallback(self):
00131 self._laterCalled = True
00132 self._called = self._reactor.seconds()
00133 self._reactor.stop()
00134
00135
00136 @skipIfNoTwisted
00137 class ReactorTwoCallLaterTest(ReactorTestCase):
00138 def test_callLater(self):
00139 self._later1Called = False
00140 self._later2Called = False
00141 self._now = self._reactor.seconds()
00142 self._timeout1 = 0.0005
00143 dc1 = self._reactor.callLater(self._timeout1, self.callLaterCallback1)
00144 self._timeout2 = 0.001
00145 dc2 = self._reactor.callLater(self._timeout2, self.callLaterCallback2)
00146 self.assertTrue(self._reactor.getDelayedCalls() == [dc1, dc2] or
00147 self._reactor.getDelayedCalls() == [dc2, dc1])
00148 self._reactor.run()
00149 self.assertTrue(self._later1Called)
00150 self.assertTrue(self._later2Called)
00151 self.assertTrue(self._called1 - self._now > self._timeout1)
00152 self.assertTrue(self._called2 - self._now > self._timeout2)
00153 self.assertEqual(self._reactor.getDelayedCalls(), [])
00154
00155 def callLaterCallback1(self):
00156 self._later1Called = True
00157 self._called1 = self._reactor.seconds()
00158
00159 def callLaterCallback2(self):
00160 self._later2Called = True
00161 self._called2 = self._reactor.seconds()
00162 self._reactor.stop()
00163
00164
00165 @skipIfNoTwisted
00166 class ReactorCallFromThreadTest(ReactorTestCase):
00167 def setUp(self):
00168 super(ReactorCallFromThreadTest, self).setUp()
00169 self._mainThread = thread.get_ident()
00170
00171 def tearDown(self):
00172 self._thread.join()
00173 super(ReactorCallFromThreadTest, self).tearDown()
00174
00175 def _newThreadRun(self):
00176 self.assertNotEqual(self._mainThread, thread.get_ident())
00177 if hasattr(self._thread, 'ident'):
00178 self.assertEqual(self._thread.ident, thread.get_ident())
00179 self._reactor.callFromThread(self._fnCalledFromThread)
00180
00181 def _fnCalledFromThread(self):
00182 self.assertEqual(self._mainThread, thread.get_ident())
00183 self._reactor.stop()
00184
00185 def _whenRunningCallback(self):
00186 self._thread = threading.Thread(target=self._newThreadRun)
00187 self._thread.start()
00188
00189 def testCallFromThread(self):
00190 self._reactor.callWhenRunning(self._whenRunningCallback)
00191 self._reactor.run()
00192
00193
00194 @skipIfNoTwisted
00195 class ReactorCallInThread(ReactorTestCase):
00196 def setUp(self):
00197 super(ReactorCallInThread, self).setUp()
00198 self._mainThread = thread.get_ident()
00199
00200 def _fnCalledInThread(self, *args, **kwargs):
00201 self.assertNotEqual(thread.get_ident(), self._mainThread)
00202 self._reactor.callFromThread(lambda: self._reactor.stop())
00203
00204 def _whenRunningCallback(self):
00205 self._reactor.callInThread(self._fnCalledInThread)
00206
00207 def testCallInThread(self):
00208 self._reactor.callWhenRunning(self._whenRunningCallback)
00209 self._reactor.run()
00210
00211
00212 class Reader(object):
00213 def __init__(self, fd, callback):
00214 self._fd = fd
00215 self._callback = callback
00216
00217 def logPrefix(self):
00218 return "Reader"
00219
00220 def close(self):
00221 self._fd.close()
00222
00223 def fileno(self):
00224 return self._fd.fileno()
00225
00226 def readConnectionLost(self, reason):
00227 self.close()
00228
00229 def connectionLost(self, reason):
00230 self.close()
00231
00232 def doRead(self):
00233 self._callback(self._fd)
00234 if have_twisted:
00235 Reader = implementer(IReadDescriptor)(Reader)
00236
00237
00238 class Writer(object):
00239 def __init__(self, fd, callback):
00240 self._fd = fd
00241 self._callback = callback
00242
00243 def logPrefix(self):
00244 return "Writer"
00245
00246 def close(self):
00247 self._fd.close()
00248
00249 def fileno(self):
00250 return self._fd.fileno()
00251
00252 def connectionLost(self, reason):
00253 self.close()
00254
00255 def doWrite(self):
00256 self._callback(self._fd)
00257 if have_twisted:
00258 Writer = implementer(IWriteDescriptor)(Writer)
00259
00260
00261 @skipIfNoTwisted
00262 class ReactorReaderWriterTest(ReactorTestCase):
00263 def _set_nonblocking(self, fd):
00264 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
00265 fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
00266
00267 def setUp(self):
00268 super(ReactorReaderWriterTest, self).setUp()
00269 r, w = os.pipe()
00270 self._set_nonblocking(r)
00271 self._set_nonblocking(w)
00272 set_close_exec(r)
00273 set_close_exec(w)
00274 self._p1 = os.fdopen(r, "rb", 0)
00275 self._p2 = os.fdopen(w, "wb", 0)
00276
00277 def tearDown(self):
00278 super(ReactorReaderWriterTest, self).tearDown()
00279 self._p1.close()
00280 self._p2.close()
00281
00282 def _testReadWrite(self):
00283 """
00284 In this test the writer writes an 'x' to its fd. The reader
00285 reads it, check the value and ends the test.
00286 """
00287 self.shouldWrite = True
00288
00289 def checkReadInput(fd):
00290 self.assertEquals(fd.read(1), b'x')
00291 self._reactor.stop()
00292
00293 def writeOnce(fd):
00294 if self.shouldWrite:
00295 self.shouldWrite = False
00296 fd.write(b'x')
00297 self._reader = Reader(self._p1, checkReadInput)
00298 self._writer = Writer(self._p2, writeOnce)
00299
00300 self._reactor.addWriter(self._writer)
00301
00302
00303
00304 self._reactor.addReader(self._reader)
00305 self._reactor.addReader(self._reader)
00306
00307 def testReadWrite(self):
00308 self._reactor.callWhenRunning(self._testReadWrite)
00309 self._reactor.run()
00310
00311 def _testNoWriter(self):
00312 """
00313 In this test we have no writer. Make sure the reader doesn't
00314 read anything.
00315 """
00316 def checkReadInput(fd):
00317 self.fail("Must not be called.")
00318
00319 def stopTest():
00320
00321
00322 self._writer.close()
00323 self._reactor.stop()
00324 self._reader = Reader(self._p1, checkReadInput)
00325
00326
00327 self._writer = Writer(self._p2, lambda fd: fd.write('x'))
00328
00329
00330 self._reactor.addWriter(self._writer)
00331 self._reactor.removeWriter(self._writer)
00332
00333
00334
00335 self._reactor.addReader(self._reader)
00336
00337
00338 self._reactor.callLater(0.001, stopTest)
00339
00340 def testNoWriter(self):
00341 self._reactor.callWhenRunning(self._testNoWriter)
00342 self._reactor.run()
00343
00344
00345
00346
00347
00348 @skipIfNoTwisted
00349 @unittest.skipIf(not have_twisted_web, 'twisted web not present')
00350 class CompatibilityTests(unittest.TestCase):
00351 def setUp(self):
00352 self.saved_signals = save_signal_handlers()
00353 self.io_loop = IOLoop()
00354 self.io_loop.make_current()
00355 self.reactor = TornadoReactor(self.io_loop)
00356
00357 def tearDown(self):
00358 self.reactor.disconnectAll()
00359 self.io_loop.clear_current()
00360 self.io_loop.close(all_fds=True)
00361 restore_signal_handlers(self.saved_signals)
00362
00363 def start_twisted_server(self):
00364 class HelloResource(Resource):
00365 isLeaf = True
00366
00367 def render_GET(self, request):
00368 return "Hello from twisted!"
00369 site = Site(HelloResource())
00370 port = self.reactor.listenTCP(0, site, interface='127.0.0.1')
00371 self.twisted_port = port.getHost().port
00372
00373 def start_tornado_server(self):
00374 class HelloHandler(RequestHandler):
00375 def get(self):
00376 self.write("Hello from tornado!")
00377 app = Application([('/', HelloHandler)],
00378 log_function=lambda x: None)
00379 server = HTTPServer(app, io_loop=self.io_loop)
00380 sock, self.tornado_port = bind_unused_port()
00381 server.add_sockets([sock])
00382
00383 def run_ioloop(self):
00384 self.stop_loop = self.io_loop.stop
00385 self.io_loop.start()
00386 self.reactor.fireSystemEvent('shutdown')
00387
00388 def run_reactor(self):
00389 self.stop_loop = self.reactor.stop
00390 self.stop = self.reactor.stop
00391 self.reactor.run()
00392
00393 def tornado_fetch(self, url, runner):
00394 responses = []
00395 client = AsyncHTTPClient(self.io_loop)
00396
00397 def callback(response):
00398 responses.append(response)
00399 self.stop_loop()
00400 client.fetch(url, callback=callback)
00401 runner()
00402 self.assertEqual(len(responses), 1)
00403 responses[0].rethrow()
00404 return responses[0]
00405
00406 def twisted_fetch(self, url, runner):
00407
00408 chunks = []
00409 client = Agent(self.reactor)
00410 d = client.request('GET', url)
00411
00412 class Accumulator(Protocol):
00413 def __init__(self, finished):
00414 self.finished = finished
00415
00416 def dataReceived(self, data):
00417 chunks.append(data)
00418
00419 def connectionLost(self, reason):
00420 self.finished.callback(None)
00421
00422 def callback(response):
00423 finished = Deferred()
00424 response.deliverBody(Accumulator(finished))
00425 return finished
00426 d.addCallback(callback)
00427
00428 def shutdown(ignored):
00429 self.stop_loop()
00430 d.addBoth(shutdown)
00431 runner()
00432 self.assertTrue(chunks)
00433 return ''.join(chunks)
00434
00435 def testTwistedServerTornadoClientIOLoop(self):
00436 self.start_twisted_server()
00437 response = self.tornado_fetch(
00438 'http://localhost:%d' % self.twisted_port, self.run_ioloop)
00439 self.assertEqual(response.body, 'Hello from twisted!')
00440
00441 def testTwistedServerTornadoClientReactor(self):
00442 self.start_twisted_server()
00443 response = self.tornado_fetch(
00444 'http://localhost:%d' % self.twisted_port, self.run_reactor)
00445 self.assertEqual(response.body, 'Hello from twisted!')
00446
00447 def testTornadoServerTwistedClientIOLoop(self):
00448 self.start_tornado_server()
00449 response = self.twisted_fetch(
00450 'http://localhost:%d' % self.tornado_port, self.run_ioloop)
00451 self.assertEqual(response, 'Hello from tornado!')
00452
00453 def testTornadoServerTwistedClientReactor(self):
00454 self.start_tornado_server()
00455 response = self.twisted_fetch(
00456 'http://localhost:%d' % self.tornado_port, self.run_reactor)
00457 self.assertEqual(response, 'Hello from tornado!')
00458
00459
00460 if have_twisted:
00461
00462
00463
00464
00465
00466
00467
00468
00469 twisted_tests = {
00470 'twisted.internet.test.test_core.ObjectModelIntegrationTest': [],
00471 'twisted.internet.test.test_core.SystemEventTestsBuilder': [
00472 'test_iterate',
00473
00474 'test_runAfterCrash',
00475 ],
00476 'twisted.internet.test.test_fdset.ReactorFDSetTestsBuilder': [
00477 "test_lostFileDescriptor",
00478 ],
00479 'twisted.internet.test.test_process.ProcessTestsBuilder': [
00480
00481
00482 'test_changeGID',
00483 'test_changeUID',
00484 ],
00485
00486
00487
00488
00489 'twisted.internet.test.test_tcp.TCPClientTestsBuilder': [
00490 'test_badContext',
00491 ],
00492 'twisted.internet.test.test_tcp.TCPPortTestsBuilder': [
00493
00494 'test_buildProtocolIPv6AddressScopeID',
00495 'test_portGetHostOnIPv6ScopeID',
00496 'test_serverGetHostOnIPv6ScopeID',
00497 'test_serverGetPeerOnIPv6ScopeID',
00498 ],
00499 'twisted.internet.test.test_tcp.TCPConnectionTestsBuilder': [],
00500 'twisted.internet.test.test_tcp.WriteSequenceTests': [],
00501 'twisted.internet.test.test_tcp.AbortConnectionTestCase': [],
00502 'twisted.internet.test.test_threads.ThreadTestsBuilder': [],
00503 'twisted.internet.test.test_time.TimeTestsBuilder': [],
00504
00505
00506 'twisted.internet.test.test_udp.UDPServerTestsBuilder': [],
00507 'twisted.internet.test.test_unix.UNIXTestsBuilder': [
00508
00509
00510 'test_connectToLinuxAbstractNamespace',
00511 'test_listenOnLinuxAbstractNamespace',
00512
00513
00514
00515 'test_sendFileDescriptor',
00516 'test_sendFileDescriptorTriggersPauseProducing',
00517 'test_descriptorDeliveredBeforeBytes',
00518 'test_avoidLeakingFileDescriptors',
00519 ],
00520 'twisted.internet.test.test_unix.UNIXDatagramTestsBuilder': [
00521 'test_listenOnLinuxAbstractNamespace',
00522 ],
00523 'twisted.internet.test.test_unix.UNIXPortTestsBuilder': [],
00524 }
00525 for test_name, blacklist in twisted_tests.items():
00526 try:
00527 test_class = import_object(test_name)
00528 except (ImportError, AttributeError):
00529 continue
00530 for test_func in blacklist:
00531 if hasattr(test_class, test_func):
00532
00533
00534 setattr(test_class, test_func, lambda self: None)
00535
00536 def make_test_subclass(test_class):
00537 class TornadoTest(test_class):
00538 _reactors = ["tornado.platform.twisted._TestReactor"]
00539
00540 def setUp(self):
00541
00542
00543
00544 self.__curdir = os.getcwd()
00545 self.__tempdir = tempfile.mkdtemp()
00546 os.chdir(self.__tempdir)
00547 super(TornadoTest, self).setUp()
00548
00549 def tearDown(self):
00550 super(TornadoTest, self).tearDown()
00551 os.chdir(self.__curdir)
00552 shutil.rmtree(self.__tempdir)
00553
00554 def buildReactor(self):
00555 self.__saved_signals = save_signal_handlers()
00556 return test_class.buildReactor(self)
00557
00558 def unbuildReactor(self, reactor):
00559 test_class.unbuildReactor(self, reactor)
00560
00561
00562
00563
00564
00565 reactor._io_loop.close(all_fds=True)
00566 restore_signal_handlers(self.__saved_signals)
00567
00568 TornadoTest.__name__ = test_class.__name__
00569 return TornadoTest
00570 test_subclass = make_test_subclass(test_class)
00571 globals().update(test_subclass.makeTestCaseClasses())
00572
00573
00574
00575
00576
00577 log.defaultObserver.stop()
00578
00579
00580
00581
00582 if have_twisted:
00583 class LayeredTwistedIOLoop(TwistedIOLoop):
00584 """Layers a TwistedIOLoop on top of a TornadoReactor on a SelectIOLoop.
00585
00586 This is of course silly, but is useful for testing purposes to make
00587 sure we're implementing both sides of the various interfaces
00588 correctly. In some tests another TornadoReactor is layered on top
00589 of the whole stack.
00590 """
00591 def initialize(self):
00592
00593
00594
00595 self.real_io_loop = SelectIOLoop()
00596 reactor = TornadoReactor(io_loop=self.real_io_loop)
00597 super(LayeredTwistedIOLoop, self).initialize(reactor=reactor)
00598 self.add_callback(self.make_current)
00599
00600 def close(self, all_fds=False):
00601 super(LayeredTwistedIOLoop, self).close(all_fds=all_fds)
00602
00603 for reader in self.reactor._internalReaders:
00604 self.reactor.removeReader(reader)
00605 reader.connectionLost(None)
00606 self.real_io_loop.close(all_fds=all_fds)
00607
00608 def stop(self):
00609
00610
00611
00612
00613
00614
00615 self.reactor.callWhenRunning(self.reactor.crash)
00616
00617 if __name__ == "__main__":
00618 unittest.main()