00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 """
00017 Unittest for the twisted-style reactor.
00018 """
00019
00020 from __future__ import absolute_import, division, with_statement
00021
00022 import os
00023 import signal
00024 import thread
00025 import threading
00026 import unittest
00027
00028 try:
00029 import fcntl
00030 import twisted
00031 from twisted.internet.defer import Deferred
00032 from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor
00033 from twisted.internet.protocol import Protocol
00034 from twisted.web.client import Agent
00035 from twisted.web.resource import Resource
00036 from twisted.web.server import Site
00037 from twisted.python import log
00038 from tornado.platform.twisted import TornadoReactor
00039 from zope.interface import implements
00040 except ImportError:
00041 fcntl = None
00042 twisted = None
00043 IReadDescriptor = IWriteDescriptor = None
00044
00045 def implements(f):
00046 pass
00047
00048 from tornado.httpclient import AsyncHTTPClient
00049 from tornado.ioloop import IOLoop
00050 from tornado.platform.auto import set_close_exec
00051 from tornado.testing import get_unused_port
00052 from tornado.util import import_object
00053 from tornado.web import RequestHandler, Application
00054
00055
00056 def save_signal_handlers():
00057 saved = {}
00058 for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGCHLD]:
00059 saved[sig] = signal.getsignal(sig)
00060 assert "twisted" not in repr(saved), repr(saved)
00061 return saved
00062
00063
00064 def restore_signal_handlers(saved):
00065 for sig, handler in saved.iteritems():
00066 signal.signal(sig, handler)
00067
00068
00069 class ReactorTestCase(unittest.TestCase):
00070 def setUp(self):
00071 self._saved_signals = save_signal_handlers()
00072 self._io_loop = IOLoop()
00073 self._reactor = TornadoReactor(self._io_loop)
00074
00075 def tearDown(self):
00076 self._io_loop.close(all_fds=True)
00077 restore_signal_handlers(self._saved_signals)
00078
00079
00080 class ReactorWhenRunningTest(ReactorTestCase):
00081 def test_whenRunning(self):
00082 self._whenRunningCalled = False
00083 self._anotherWhenRunningCalled = False
00084 self._reactor.callWhenRunning(self.whenRunningCallback)
00085 self._reactor.run()
00086 self.assertTrue(self._whenRunningCalled)
00087 self.assertTrue(self._anotherWhenRunningCalled)
00088
00089 def whenRunningCallback(self):
00090 self._whenRunningCalled = True
00091 self._reactor.callWhenRunning(self.anotherWhenRunningCallback)
00092 self._reactor.stop()
00093
00094 def anotherWhenRunningCallback(self):
00095 self._anotherWhenRunningCalled = True
00096
00097
00098 class ReactorCallLaterTest(ReactorTestCase):
00099 def test_callLater(self):
00100 self._laterCalled = False
00101 self._now = self._reactor.seconds()
00102 self._timeout = 0.001
00103 dc = self._reactor.callLater(self._timeout, self.callLaterCallback)
00104 self.assertEqual(self._reactor.getDelayedCalls(), [dc])
00105 self._reactor.run()
00106 self.assertTrue(self._laterCalled)
00107 self.assertTrue(self._called - self._now > self._timeout)
00108 self.assertEqual(self._reactor.getDelayedCalls(), [])
00109
00110 def callLaterCallback(self):
00111 self._laterCalled = True
00112 self._called = self._reactor.seconds()
00113 self._reactor.stop()
00114
00115
00116 class ReactorTwoCallLaterTest(ReactorTestCase):
00117 def test_callLater(self):
00118 self._later1Called = False
00119 self._later2Called = False
00120 self._now = self._reactor.seconds()
00121 self._timeout1 = 0.0005
00122 dc1 = self._reactor.callLater(self._timeout1, self.callLaterCallback1)
00123 self._timeout2 = 0.001
00124 dc2 = self._reactor.callLater(self._timeout2, self.callLaterCallback2)
00125 self.assertTrue(self._reactor.getDelayedCalls() == [dc1, dc2] or
00126 self._reactor.getDelayedCalls() == [dc2, dc1])
00127 self._reactor.run()
00128 self.assertTrue(self._later1Called)
00129 self.assertTrue(self._later2Called)
00130 self.assertTrue(self._called1 - self._now > self._timeout1)
00131 self.assertTrue(self._called2 - self._now > self._timeout2)
00132 self.assertEqual(self._reactor.getDelayedCalls(), [])
00133
00134 def callLaterCallback1(self):
00135 self._later1Called = True
00136 self._called1 = self._reactor.seconds()
00137
00138 def callLaterCallback2(self):
00139 self._later2Called = True
00140 self._called2 = self._reactor.seconds()
00141 self._reactor.stop()
00142
00143
00144 class ReactorCallFromThreadTest(ReactorTestCase):
00145 def setUp(self):
00146 super(ReactorCallFromThreadTest, self).setUp()
00147 self._mainThread = thread.get_ident()
00148
00149 def tearDown(self):
00150 self._thread.join()
00151 super(ReactorCallFromThreadTest, self).tearDown()
00152
00153 def _newThreadRun(self):
00154 self.assertNotEqual(self._mainThread, thread.get_ident())
00155 if hasattr(self._thread, 'ident'):
00156 self.assertEqual(self._thread.ident, thread.get_ident())
00157 self._reactor.callFromThread(self._fnCalledFromThread)
00158
00159 def _fnCalledFromThread(self):
00160 self.assertEqual(self._mainThread, thread.get_ident())
00161 self._reactor.stop()
00162
00163 def _whenRunningCallback(self):
00164 self._thread = threading.Thread(target=self._newThreadRun)
00165 self._thread.start()
00166
00167 def testCallFromThread(self):
00168 self._reactor.callWhenRunning(self._whenRunningCallback)
00169 self._reactor.run()
00170
00171
00172 class ReactorCallInThread(ReactorTestCase):
00173 def setUp(self):
00174 super(ReactorCallInThread, self).setUp()
00175 self._mainThread = thread.get_ident()
00176
00177 def _fnCalledInThread(self, *args, **kwargs):
00178 self.assertNotEqual(thread.get_ident(), self._mainThread)
00179 self._reactor.callFromThread(lambda: self._reactor.stop())
00180
00181 def _whenRunningCallback(self):
00182 self._reactor.callInThread(self._fnCalledInThread)
00183
00184 def testCallInThread(self):
00185 self._reactor.callWhenRunning(self._whenRunningCallback)
00186 self._reactor.run()
00187
00188
00189 class Reader:
00190 implements(IReadDescriptor)
00191
00192 def __init__(self, fd, callback):
00193 self._fd = fd
00194 self._callback = callback
00195
00196 def logPrefix(self):
00197 return "Reader"
00198
00199 def close(self):
00200 self._fd.close()
00201
00202 def fileno(self):
00203 return self._fd.fileno()
00204
00205 def connectionLost(self, reason):
00206 self.close()
00207
00208 def doRead(self):
00209 self._callback(self._fd)
00210
00211
00212 class Writer:
00213 implements(IWriteDescriptor)
00214
00215 def __init__(self, fd, callback):
00216 self._fd = fd
00217 self._callback = callback
00218
00219 def logPrefix(self):
00220 return "Writer"
00221
00222 def close(self):
00223 self._fd.close()
00224
00225 def fileno(self):
00226 return self._fd.fileno()
00227
00228 def connectionLost(self, reason):
00229 self.close()
00230
00231 def doWrite(self):
00232 self._callback(self._fd)
00233
00234
00235 class ReactorReaderWriterTest(ReactorTestCase):
00236 def _set_nonblocking(self, fd):
00237 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
00238 fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
00239
00240 def setUp(self):
00241 super(ReactorReaderWriterTest, self).setUp()
00242 r, w = os.pipe()
00243 self._set_nonblocking(r)
00244 self._set_nonblocking(w)
00245 set_close_exec(r)
00246 set_close_exec(w)
00247 self._p1 = os.fdopen(r, "rb", 0)
00248 self._p2 = os.fdopen(w, "wb", 0)
00249
00250 def tearDown(self):
00251 super(ReactorReaderWriterTest, self).tearDown()
00252 self._p1.close()
00253 self._p2.close()
00254
00255 def _testReadWrite(self):
00256 """
00257 In this test the writer writes an 'x' to its fd. The reader
00258 reads it, check the value and ends the test.
00259 """
00260 self.shouldWrite = True
00261
00262 def checkReadInput(fd):
00263 self.assertEquals(fd.read(), 'x')
00264 self._reactor.stop()
00265
00266 def writeOnce(fd):
00267 if self.shouldWrite:
00268 self.shouldWrite = False
00269 fd.write('x')
00270 self._reader = Reader(self._p1, checkReadInput)
00271 self._writer = Writer(self._p2, writeOnce)
00272
00273 self._reactor.addWriter(self._writer)
00274
00275
00276
00277 self._reactor.addReader(self._reader)
00278 self._reactor.addReader(self._reader)
00279
00280 def testReadWrite(self):
00281 self._reactor.callWhenRunning(self._testReadWrite)
00282 self._reactor.run()
00283
00284 def _testNoWriter(self):
00285 """
00286 In this test we have no writer. Make sure the reader doesn't
00287 read anything.
00288 """
00289 def checkReadInput(fd):
00290 self.fail("Must not be called.")
00291
00292 def stopTest():
00293
00294
00295 self._writer.close()
00296 self._reactor.stop()
00297 self._reader = Reader(self._p1, checkReadInput)
00298
00299
00300 self._writer = Writer(self._p2, lambda fd: fd.write('x'))
00301
00302
00303 self._reactor.addWriter(self._writer)
00304 self._reactor.removeWriter(self._writer)
00305
00306
00307
00308 self._reactor.addReader(self._reader)
00309
00310
00311 self._reactor.callLater(0.001, stopTest)
00312
00313 def testNoWriter(self):
00314 self._reactor.callWhenRunning(self._testNoWriter)
00315 self._reactor.run()
00316
00317
00318
00319
00320
00321 class CompatibilityTests(unittest.TestCase):
00322 def setUp(self):
00323 self.saved_signals = save_signal_handlers()
00324 self.io_loop = IOLoop()
00325 self.reactor = TornadoReactor(self.io_loop)
00326
00327 def tearDown(self):
00328 self.reactor.disconnectAll()
00329 self.io_loop.close(all_fds=True)
00330 restore_signal_handlers(self.saved_signals)
00331
00332 def start_twisted_server(self):
00333 class HelloResource(Resource):
00334 isLeaf = True
00335
00336 def render_GET(self, request):
00337 return "Hello from twisted!"
00338 site = Site(HelloResource())
00339 self.twisted_port = get_unused_port()
00340 self.reactor.listenTCP(self.twisted_port, site, interface='127.0.0.1')
00341
00342 def start_tornado_server(self):
00343 class HelloHandler(RequestHandler):
00344 def get(self):
00345 self.write("Hello from tornado!")
00346 app = Application([('/', HelloHandler)],
00347 log_function=lambda x: None)
00348 self.tornado_port = get_unused_port()
00349 app.listen(self.tornado_port, address='127.0.0.1', io_loop=self.io_loop)
00350
00351 def run_ioloop(self):
00352 self.stop_loop = self.io_loop.stop
00353 self.io_loop.start()
00354 self.reactor.fireSystemEvent('shutdown')
00355
00356 def run_reactor(self):
00357 self.stop_loop = self.reactor.stop
00358 self.stop = self.reactor.stop
00359 self.reactor.run()
00360
00361 def tornado_fetch(self, url, runner):
00362 responses = []
00363 client = AsyncHTTPClient(self.io_loop)
00364
00365 def callback(response):
00366 responses.append(response)
00367 self.stop_loop()
00368 client.fetch(url, callback=callback)
00369 runner()
00370 self.assertEqual(len(responses), 1)
00371 responses[0].rethrow()
00372 return responses[0]
00373
00374 def twisted_fetch(self, url, runner):
00375
00376 chunks = []
00377 client = Agent(self.reactor)
00378 d = client.request('GET', url)
00379
00380 class Accumulator(Protocol):
00381 def __init__(self, finished):
00382 self.finished = finished
00383
00384 def dataReceived(self, data):
00385 chunks.append(data)
00386
00387 def connectionLost(self, reason):
00388 self.finished.callback(None)
00389
00390 def callback(response):
00391 finished = Deferred()
00392 response.deliverBody(Accumulator(finished))
00393 return finished
00394 d.addCallback(callback)
00395
00396 def shutdown(ignored):
00397 self.stop_loop()
00398 d.addBoth(shutdown)
00399 runner()
00400 self.assertTrue(chunks)
00401 return ''.join(chunks)
00402
00403 def testTwistedServerTornadoClientIOLoop(self):
00404 self.start_twisted_server()
00405 response = self.tornado_fetch(
00406 'http://localhost:%d' % self.twisted_port, self.run_ioloop)
00407 self.assertEqual(response.body, 'Hello from twisted!')
00408
00409 def testTwistedServerTornadoClientReactor(self):
00410 self.start_twisted_server()
00411 response = self.tornado_fetch(
00412 'http://localhost:%d' % self.twisted_port, self.run_reactor)
00413 self.assertEqual(response.body, 'Hello from twisted!')
00414
00415 def testTornadoServerTwistedClientIOLoop(self):
00416 self.start_tornado_server()
00417 response = self.twisted_fetch(
00418 'http://localhost:%d' % self.tornado_port, self.run_ioloop)
00419 self.assertEqual(response, 'Hello from tornado!')
00420
00421 def testTornadoServerTwistedClientReactor(self):
00422 self.start_tornado_server()
00423 response = self.twisted_fetch(
00424 'http://localhost:%d' % self.tornado_port, self.run_reactor)
00425 self.assertEqual(response, 'Hello from tornado!')
00426
00427
00428 if twisted is None:
00429 del ReactorWhenRunningTest
00430 del ReactorCallLaterTest
00431 del ReactorTwoCallLaterTest
00432 del ReactorCallFromThreadTest
00433 del ReactorCallInThread
00434 del ReactorReaderWriterTest
00435 del CompatibilityTests
00436 else:
00437
00438
00439
00440
00441
00442
00443
00444
00445 twisted_tests = {
00446 'twisted.internet.test.test_core.ObjectModelIntegrationTest': [],
00447 'twisted.internet.test.test_core.SystemEventTestsBuilder': [
00448 'test_iterate',
00449 ],
00450 'twisted.internet.test.test_fdset.ReactorFDSetTestsBuilder': [
00451 "test_lostFileDescriptor",
00452 ],
00453 'twisted.internet.test.test_process.ProcessTestsBuilder': [
00454
00455 'test_systemCallUninterruptedByChildExit',
00456
00457 'test_shebang',
00458 ],
00459
00460
00461
00462
00463 'twisted.internet.test.test_tcp.TCPClientTestsBuilder': [
00464 'test_badContext',
00465 ],
00466 'twisted.internet.test.test_tcp.TCPPortTestsBuilder': [
00467
00468 'test_buildProtocolIPv6AddressScopeID',
00469 'test_portGetHostOnIPv6ScopeID',
00470 'test_serverGetHostOnIPv6ScopeID',
00471 'test_serverGetPeerOnIPv6ScopeID',
00472 ],
00473 'twisted.internet.test.test_tcp.TCPConnectionTestsBuilder': [],
00474 'twisted.internet.test.test_tcp.WriteSequenceTests': [],
00475 'twisted.internet.test.test_tcp.AbortConnectionTestCase': [],
00476 'twisted.internet.test.test_threads.ThreadTestsBuilder': [],
00477 'twisted.internet.test.test_time.TimeTestsBuilder': [],
00478
00479
00480 'twisted.internet.test.test_udp.UDPServerTestsBuilder': [],
00481 'twisted.internet.test.test_unix.UNIXTestsBuilder': [
00482
00483
00484 'test_connectToLinuxAbstractNamespace',
00485 'test_listenOnLinuxAbstractNamespace',
00486 ],
00487 'twisted.internet.test.test_unix.UNIXDatagramTestsBuilder': [
00488 'test_listenOnLinuxAbstractNamespace',
00489 ],
00490 'twisted.internet.test.test_unix.UNIXPortTestsBuilder': [],
00491 }
00492 for test_name, blacklist in twisted_tests.iteritems():
00493 try:
00494 test_class = import_object(test_name)
00495 except (ImportError, AttributeError):
00496 continue
00497 for test_func in blacklist:
00498 if hasattr(test_class, test_func):
00499
00500
00501 setattr(test_class, test_func, lambda self: None)
00502
00503 def make_test_subclass(test_class):
00504 class TornadoTest(test_class):
00505 _reactors = ["tornado.platform.twisted._TestReactor"]
00506
00507 def buildReactor(self):
00508 self.__saved_signals = save_signal_handlers()
00509 return test_class.buildReactor(self)
00510
00511 def unbuildReactor(self, reactor):
00512 test_class.unbuildReactor(self, reactor)
00513
00514
00515
00516
00517
00518 reactor._io_loop.close(all_fds=True)
00519 restore_signal_handlers(self.__saved_signals)
00520
00521 TornadoTest.__name__ = test_class.__name__
00522 return TornadoTest
00523 test_subclass = make_test_subclass(test_class)
00524 globals().update(test_subclass.makeTestCaseClasses())
00525
00526
00527
00528
00529
00530 log.defaultObserver.stop()
00531
00532
00533
00534 if __name__ == "__main__":
00535 unittest.main()