twisted_test.py
Go to the documentation of this file.
00001 # Author: Ovidiu Predescu
00002 # Date: July 2011
00003 #
00004 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00005 # not use this file except in compliance with the License. You may obtain
00006 # a copy of the License at
00007 #
00008 #     http://www.apache.org/licenses/LICENSE-2.0
00009 #
00010 # Unless required by applicable law or agreed to in writing, software
00011 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013 # License for the specific language governing permissions and limitations
00014 # under the License.
00015 
00016 """
00017 Unittest for the twisted-style reactor.
00018 """
00019 
00020 from __future__ import absolute_import, division, print_function, with_statement
00021 
00022 import os
00023 import shutil
00024 import signal
00025 import tempfile
00026 import threading
00027 
00028 try:
00029     import fcntl
00030     from twisted.internet.defer import Deferred
00031     from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor
00032     from twisted.internet.protocol import Protocol
00033     from twisted.python import log
00034     from tornado.platform.twisted import TornadoReactor, TwistedIOLoop
00035     from zope.interface import implementer
00036     have_twisted = True
00037 except ImportError:
00038     have_twisted = False
00039 
00040 # The core of Twisted 12.3.0 is available on python 3, but twisted.web is not
00041 # so test for it separately.
00042 try:
00043     from twisted.web.client import Agent
00044     from twisted.web.resource import Resource
00045     from twisted.web.server import Site
00046     have_twisted_web = True
00047 except ImportError:
00048     have_twisted_web = False
00049 
00050 try:
00051     import thread  # py2
00052 except ImportError:
00053     import _thread as thread  # py3
00054 
00055 from tornado.httpclient import AsyncHTTPClient
00056 from tornado.httpserver import HTTPServer
00057 from tornado.ioloop import IOLoop
00058 from tornado.platform.auto import set_close_exec
00059 from tornado.platform.select import SelectIOLoop
00060 from tornado.testing import bind_unused_port
00061 from tornado.test.util import unittest
00062 from tornado.util import import_object
00063 from tornado.web import RequestHandler, Application
00064 
00065 skipIfNoTwisted = unittest.skipUnless(have_twisted,
00066                                       "twisted module not present")
00067 
00068 
00069 def save_signal_handlers():
00070     saved = {}
00071     for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGCHLD]:
00072         saved[sig] = signal.getsignal(sig)
00073     if "twisted" in repr(saved):
00074         if not issubclass(IOLoop.configured_class(), TwistedIOLoop):
00075             # when the global ioloop is twisted, we expect the signal
00076             # handlers to be installed.  Otherwise, it means we're not
00077             # cleaning up after twisted properly.
00078             raise Exception("twisted signal handlers already installed")
00079     return saved
00080 
00081 
00082 def restore_signal_handlers(saved):
00083     for sig, handler in saved.items():
00084         signal.signal(sig, handler)
00085 
00086 
00087 class ReactorTestCase(unittest.TestCase):
00088     def setUp(self):
00089         self._saved_signals = save_signal_handlers()
00090         self._io_loop = IOLoop()
00091         self._reactor = TornadoReactor(self._io_loop)
00092 
00093     def tearDown(self):
00094         self._io_loop.close(all_fds=True)
00095         restore_signal_handlers(self._saved_signals)
00096 
00097 
00098 @skipIfNoTwisted
00099 class ReactorWhenRunningTest(ReactorTestCase):
00100     def test_whenRunning(self):
00101         self._whenRunningCalled = False
00102         self._anotherWhenRunningCalled = False
00103         self._reactor.callWhenRunning(self.whenRunningCallback)
00104         self._reactor.run()
00105         self.assertTrue(self._whenRunningCalled)
00106         self.assertTrue(self._anotherWhenRunningCalled)
00107 
00108     def whenRunningCallback(self):
00109         self._whenRunningCalled = True
00110         self._reactor.callWhenRunning(self.anotherWhenRunningCallback)
00111         self._reactor.stop()
00112 
00113     def anotherWhenRunningCallback(self):
00114         self._anotherWhenRunningCalled = True
00115 
00116 
00117 @skipIfNoTwisted
00118 class ReactorCallLaterTest(ReactorTestCase):
00119     def test_callLater(self):
00120         self._laterCalled = False
00121         self._now = self._reactor.seconds()
00122         self._timeout = 0.001
00123         dc = self._reactor.callLater(self._timeout, self.callLaterCallback)
00124         self.assertEqual(self._reactor.getDelayedCalls(), [dc])
00125         self._reactor.run()
00126         self.assertTrue(self._laterCalled)
00127         self.assertTrue(self._called - self._now > self._timeout)
00128         self.assertEqual(self._reactor.getDelayedCalls(), [])
00129 
00130     def callLaterCallback(self):
00131         self._laterCalled = True
00132         self._called = self._reactor.seconds()
00133         self._reactor.stop()
00134 
00135 
00136 @skipIfNoTwisted
00137 class ReactorTwoCallLaterTest(ReactorTestCase):
00138     def test_callLater(self):
00139         self._later1Called = False
00140         self._later2Called = False
00141         self._now = self._reactor.seconds()
00142         self._timeout1 = 0.0005
00143         dc1 = self._reactor.callLater(self._timeout1, self.callLaterCallback1)
00144         self._timeout2 = 0.001
00145         dc2 = self._reactor.callLater(self._timeout2, self.callLaterCallback2)
00146         self.assertTrue(self._reactor.getDelayedCalls() == [dc1, dc2] or
00147                         self._reactor.getDelayedCalls() == [dc2, dc1])
00148         self._reactor.run()
00149         self.assertTrue(self._later1Called)
00150         self.assertTrue(self._later2Called)
00151         self.assertTrue(self._called1 - self._now > self._timeout1)
00152         self.assertTrue(self._called2 - self._now > self._timeout2)
00153         self.assertEqual(self._reactor.getDelayedCalls(), [])
00154 
00155     def callLaterCallback1(self):
00156         self._later1Called = True
00157         self._called1 = self._reactor.seconds()
00158 
00159     def callLaterCallback2(self):
00160         self._later2Called = True
00161         self._called2 = self._reactor.seconds()
00162         self._reactor.stop()
00163 
00164 
00165 @skipIfNoTwisted
00166 class ReactorCallFromThreadTest(ReactorTestCase):
00167     def setUp(self):
00168         super(ReactorCallFromThreadTest, self).setUp()
00169         self._mainThread = thread.get_ident()
00170 
00171     def tearDown(self):
00172         self._thread.join()
00173         super(ReactorCallFromThreadTest, self).tearDown()
00174 
00175     def _newThreadRun(self):
00176         self.assertNotEqual(self._mainThread, thread.get_ident())
00177         if hasattr(self._thread, 'ident'):  # new in python 2.6
00178             self.assertEqual(self._thread.ident, thread.get_ident())
00179         self._reactor.callFromThread(self._fnCalledFromThread)
00180 
00181     def _fnCalledFromThread(self):
00182         self.assertEqual(self._mainThread, thread.get_ident())
00183         self._reactor.stop()
00184 
00185     def _whenRunningCallback(self):
00186         self._thread = threading.Thread(target=self._newThreadRun)
00187         self._thread.start()
00188 
00189     def testCallFromThread(self):
00190         self._reactor.callWhenRunning(self._whenRunningCallback)
00191         self._reactor.run()
00192 
00193 
00194 @skipIfNoTwisted
00195 class ReactorCallInThread(ReactorTestCase):
00196     def setUp(self):
00197         super(ReactorCallInThread, self).setUp()
00198         self._mainThread = thread.get_ident()
00199 
00200     def _fnCalledInThread(self, *args, **kwargs):
00201         self.assertNotEqual(thread.get_ident(), self._mainThread)
00202         self._reactor.callFromThread(lambda: self._reactor.stop())
00203 
00204     def _whenRunningCallback(self):
00205         self._reactor.callInThread(self._fnCalledInThread)
00206 
00207     def testCallInThread(self):
00208         self._reactor.callWhenRunning(self._whenRunningCallback)
00209         self._reactor.run()
00210 
00211 
00212 class Reader(object):
00213     def __init__(self, fd, callback):
00214         self._fd = fd
00215         self._callback = callback
00216 
00217     def logPrefix(self):
00218         return "Reader"
00219 
00220     def close(self):
00221         self._fd.close()
00222 
00223     def fileno(self):
00224         return self._fd.fileno()
00225 
00226     def readConnectionLost(self, reason):
00227         self.close()
00228 
00229     def connectionLost(self, reason):
00230         self.close()
00231 
00232     def doRead(self):
00233         self._callback(self._fd)
00234 if have_twisted:
00235     Reader = implementer(IReadDescriptor)(Reader)
00236 
00237 
00238 class Writer(object):
00239     def __init__(self, fd, callback):
00240         self._fd = fd
00241         self._callback = callback
00242 
00243     def logPrefix(self):
00244         return "Writer"
00245 
00246     def close(self):
00247         self._fd.close()
00248 
00249     def fileno(self):
00250         return self._fd.fileno()
00251 
00252     def connectionLost(self, reason):
00253         self.close()
00254 
00255     def doWrite(self):
00256         self._callback(self._fd)
00257 if have_twisted:
00258     Writer = implementer(IWriteDescriptor)(Writer)
00259 
00260 
00261 @skipIfNoTwisted
00262 class ReactorReaderWriterTest(ReactorTestCase):
00263     def _set_nonblocking(self, fd):
00264         flags = fcntl.fcntl(fd, fcntl.F_GETFL)
00265         fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
00266 
00267     def setUp(self):
00268         super(ReactorReaderWriterTest, self).setUp()
00269         r, w = os.pipe()
00270         self._set_nonblocking(r)
00271         self._set_nonblocking(w)
00272         set_close_exec(r)
00273         set_close_exec(w)
00274         self._p1 = os.fdopen(r, "rb", 0)
00275         self._p2 = os.fdopen(w, "wb", 0)
00276 
00277     def tearDown(self):
00278         super(ReactorReaderWriterTest, self).tearDown()
00279         self._p1.close()
00280         self._p2.close()
00281 
00282     def _testReadWrite(self):
00283         """
00284         In this test the writer writes an 'x' to its fd. The reader
00285         reads it, check the value and ends the test.
00286         """
00287         self.shouldWrite = True
00288 
00289         def checkReadInput(fd):
00290             self.assertEquals(fd.read(1), b'x')
00291             self._reactor.stop()
00292 
00293         def writeOnce(fd):
00294             if self.shouldWrite:
00295                 self.shouldWrite = False
00296                 fd.write(b'x')
00297         self._reader = Reader(self._p1, checkReadInput)
00298         self._writer = Writer(self._p2, writeOnce)
00299 
00300         self._reactor.addWriter(self._writer)
00301 
00302         # Test that adding the reader twice adds it only once to
00303         # IOLoop.
00304         self._reactor.addReader(self._reader)
00305         self._reactor.addReader(self._reader)
00306 
00307     def testReadWrite(self):
00308         self._reactor.callWhenRunning(self._testReadWrite)
00309         self._reactor.run()
00310 
00311     def _testNoWriter(self):
00312         """
00313         In this test we have no writer. Make sure the reader doesn't
00314         read anything.
00315         """
00316         def checkReadInput(fd):
00317             self.fail("Must not be called.")
00318 
00319         def stopTest():
00320             # Close the writer here since the IOLoop doesn't know
00321             # about it.
00322             self._writer.close()
00323             self._reactor.stop()
00324         self._reader = Reader(self._p1, checkReadInput)
00325 
00326         # We create a writer, but it should never be invoked.
00327         self._writer = Writer(self._p2, lambda fd: fd.write('x'))
00328 
00329         # Test that adding and removing the writer leaves us with no writer.
00330         self._reactor.addWriter(self._writer)
00331         self._reactor.removeWriter(self._writer)
00332 
00333         # Test that adding and removing the reader doesn't cause
00334         # unintended effects.
00335         self._reactor.addReader(self._reader)
00336 
00337         # Wake up after a moment and stop the test
00338         self._reactor.callLater(0.001, stopTest)
00339 
00340     def testNoWriter(self):
00341         self._reactor.callWhenRunning(self._testNoWriter)
00342         self._reactor.run()
00343 
00344 # Test various combinations of twisted and tornado http servers,
00345 # http clients, and event loop interfaces.
00346 
00347 
00348 @skipIfNoTwisted
00349 @unittest.skipIf(not have_twisted_web, 'twisted web not present')
00350 class CompatibilityTests(unittest.TestCase):
00351     def setUp(self):
00352         self.saved_signals = save_signal_handlers()
00353         self.io_loop = IOLoop()
00354         self.io_loop.make_current()
00355         self.reactor = TornadoReactor(self.io_loop)
00356 
00357     def tearDown(self):
00358         self.reactor.disconnectAll()
00359         self.io_loop.clear_current()
00360         self.io_loop.close(all_fds=True)
00361         restore_signal_handlers(self.saved_signals)
00362 
00363     def start_twisted_server(self):
00364         class HelloResource(Resource):
00365             isLeaf = True
00366 
00367             def render_GET(self, request):
00368                 return "Hello from twisted!"
00369         site = Site(HelloResource())
00370         port = self.reactor.listenTCP(0, site, interface='127.0.0.1')
00371         self.twisted_port = port.getHost().port
00372 
00373     def start_tornado_server(self):
00374         class HelloHandler(RequestHandler):
00375             def get(self):
00376                 self.write("Hello from tornado!")
00377         app = Application([('/', HelloHandler)],
00378                           log_function=lambda x: None)
00379         server = HTTPServer(app, io_loop=self.io_loop)
00380         sock, self.tornado_port = bind_unused_port()
00381         server.add_sockets([sock])
00382 
00383     def run_ioloop(self):
00384         self.stop_loop = self.io_loop.stop
00385         self.io_loop.start()
00386         self.reactor.fireSystemEvent('shutdown')
00387 
00388     def run_reactor(self):
00389         self.stop_loop = self.reactor.stop
00390         self.stop = self.reactor.stop
00391         self.reactor.run()
00392 
00393     def tornado_fetch(self, url, runner):
00394         responses = []
00395         client = AsyncHTTPClient(self.io_loop)
00396 
00397         def callback(response):
00398             responses.append(response)
00399             self.stop_loop()
00400         client.fetch(url, callback=callback)
00401         runner()
00402         self.assertEqual(len(responses), 1)
00403         responses[0].rethrow()
00404         return responses[0]
00405 
00406     def twisted_fetch(self, url, runner):
00407         # http://twistedmatrix.com/documents/current/web/howto/client.html
00408         chunks = []
00409         client = Agent(self.reactor)
00410         d = client.request('GET', url)
00411 
00412         class Accumulator(Protocol):
00413             def __init__(self, finished):
00414                 self.finished = finished
00415 
00416             def dataReceived(self, data):
00417                 chunks.append(data)
00418 
00419             def connectionLost(self, reason):
00420                 self.finished.callback(None)
00421 
00422         def callback(response):
00423             finished = Deferred()
00424             response.deliverBody(Accumulator(finished))
00425             return finished
00426         d.addCallback(callback)
00427 
00428         def shutdown(ignored):
00429             self.stop_loop()
00430         d.addBoth(shutdown)
00431         runner()
00432         self.assertTrue(chunks)
00433         return ''.join(chunks)
00434 
00435     def testTwistedServerTornadoClientIOLoop(self):
00436         self.start_twisted_server()
00437         response = self.tornado_fetch(
00438             'http://localhost:%d' % self.twisted_port, self.run_ioloop)
00439         self.assertEqual(response.body, 'Hello from twisted!')
00440 
00441     def testTwistedServerTornadoClientReactor(self):
00442         self.start_twisted_server()
00443         response = self.tornado_fetch(
00444             'http://localhost:%d' % self.twisted_port, self.run_reactor)
00445         self.assertEqual(response.body, 'Hello from twisted!')
00446 
00447     def testTornadoServerTwistedClientIOLoop(self):
00448         self.start_tornado_server()
00449         response = self.twisted_fetch(
00450             'http://localhost:%d' % self.tornado_port, self.run_ioloop)
00451         self.assertEqual(response, 'Hello from tornado!')
00452 
00453     def testTornadoServerTwistedClientReactor(self):
00454         self.start_tornado_server()
00455         response = self.twisted_fetch(
00456             'http://localhost:%d' % self.tornado_port, self.run_reactor)
00457         self.assertEqual(response, 'Hello from tornado!')
00458 
00459 
00460 if have_twisted:
00461     # Import and run as much of twisted's test suite as possible.
00462     # This is unfortunately rather dependent on implementation details,
00463     # but there doesn't appear to be a clean all-in-one conformance test
00464     # suite for reactors.
00465     #
00466     # This is a list of all test suites using the ReactorBuilder
00467     # available in Twisted 11.0.0 and 11.1.0 (and a blacklist of
00468     # specific test methods to be disabled).
00469     twisted_tests = {
00470         'twisted.internet.test.test_core.ObjectModelIntegrationTest': [],
00471         'twisted.internet.test.test_core.SystemEventTestsBuilder': [
00472             'test_iterate',  # deliberately not supported
00473             # Fails on TwistedIOLoop and AsyncIOLoop.
00474             'test_runAfterCrash',
00475         ],
00476         'twisted.internet.test.test_fdset.ReactorFDSetTestsBuilder': [
00477             "test_lostFileDescriptor",  # incompatible with epoll and kqueue
00478         ],
00479         'twisted.internet.test.test_process.ProcessTestsBuilder': [
00480             # Only work as root.  Twisted's "skip" functionality works
00481             # with py27+, but not unittest2 on py26.
00482             'test_changeGID',
00483             'test_changeUID',
00484         ],
00485         # Process tests appear to work on OSX 10.7, but not 10.6
00486         #'twisted.internet.test.test_process.PTYProcessTestsBuilder': [
00487         #    'test_systemCallUninterruptedByChildExit',
00488         #    ],
00489         'twisted.internet.test.test_tcp.TCPClientTestsBuilder': [
00490             'test_badContext',  # ssl-related; see also SSLClientTestsMixin
00491         ],
00492         'twisted.internet.test.test_tcp.TCPPortTestsBuilder': [
00493             # These use link-local addresses and cause firewall prompts on mac
00494             'test_buildProtocolIPv6AddressScopeID',
00495             'test_portGetHostOnIPv6ScopeID',
00496             'test_serverGetHostOnIPv6ScopeID',
00497             'test_serverGetPeerOnIPv6ScopeID',
00498         ],
00499         'twisted.internet.test.test_tcp.TCPConnectionTestsBuilder': [],
00500         'twisted.internet.test.test_tcp.WriteSequenceTests': [],
00501         'twisted.internet.test.test_tcp.AbortConnectionTestCase': [],
00502         'twisted.internet.test.test_threads.ThreadTestsBuilder': [],
00503         'twisted.internet.test.test_time.TimeTestsBuilder': [],
00504         # Extra third-party dependencies (pyOpenSSL)
00505         #'twisted.internet.test.test_tls.SSLClientTestsMixin': [],
00506         'twisted.internet.test.test_udp.UDPServerTestsBuilder': [],
00507         'twisted.internet.test.test_unix.UNIXTestsBuilder': [
00508             # Platform-specific.  These tests would be skipped automatically
00509             # if we were running twisted's own test runner.
00510             'test_connectToLinuxAbstractNamespace',
00511             'test_listenOnLinuxAbstractNamespace',
00512             # These tests use twisted's sendmsg.c extension and sometimes
00513             # fail with what looks like uninitialized memory errors
00514             # (more common on pypy than cpython, but I've seen it on both)
00515             'test_sendFileDescriptor',
00516             'test_sendFileDescriptorTriggersPauseProducing',
00517             'test_descriptorDeliveredBeforeBytes',
00518             'test_avoidLeakingFileDescriptors',
00519         ],
00520         'twisted.internet.test.test_unix.UNIXDatagramTestsBuilder': [
00521             'test_listenOnLinuxAbstractNamespace',
00522         ],
00523         'twisted.internet.test.test_unix.UNIXPortTestsBuilder': [],
00524     }
00525     for test_name, blacklist in twisted_tests.items():
00526         try:
00527             test_class = import_object(test_name)
00528         except (ImportError, AttributeError):
00529             continue
00530         for test_func in blacklist:
00531             if hasattr(test_class, test_func):
00532                 # The test_func may be defined in a mixin, so clobber
00533                 # it instead of delattr()
00534                 setattr(test_class, test_func, lambda self: None)
00535 
00536         def make_test_subclass(test_class):
00537             class TornadoTest(test_class):
00538                 _reactors = ["tornado.platform.twisted._TestReactor"]
00539 
00540                 def setUp(self):
00541                     # Twisted's tests expect to be run from a temporary
00542                     # directory; they create files in their working directory
00543                     # and don't always clean up after themselves.
00544                     self.__curdir = os.getcwd()
00545                     self.__tempdir = tempfile.mkdtemp()
00546                     os.chdir(self.__tempdir)
00547                     super(TornadoTest, self).setUp()
00548 
00549                 def tearDown(self):
00550                     super(TornadoTest, self).tearDown()
00551                     os.chdir(self.__curdir)
00552                     shutil.rmtree(self.__tempdir)
00553 
00554                 def buildReactor(self):
00555                     self.__saved_signals = save_signal_handlers()
00556                     return test_class.buildReactor(self)
00557 
00558                 def unbuildReactor(self, reactor):
00559                     test_class.unbuildReactor(self, reactor)
00560                     # Clean up file descriptors (especially epoll/kqueue
00561                     # objects) eagerly instead of leaving them for the
00562                     # GC.  Unfortunately we can't do this in reactor.stop
00563                     # since twisted expects to be able to unregister
00564                     # connections in a post-shutdown hook.
00565                     reactor._io_loop.close(all_fds=True)
00566                     restore_signal_handlers(self.__saved_signals)
00567 
00568             TornadoTest.__name__ = test_class.__name__
00569             return TornadoTest
00570         test_subclass = make_test_subclass(test_class)
00571         globals().update(test_subclass.makeTestCaseClasses())
00572 
00573     # Since we're not using twisted's test runner, it's tricky to get
00574     # logging set up well.  Most of the time it's easiest to just
00575     # leave it turned off, but while working on these tests you may want
00576     # to uncomment one of the other lines instead.
00577     log.defaultObserver.stop()
00578     # import sys; log.startLogging(sys.stderr, setStdout=0)
00579     # log.startLoggingWithObserver(log.PythonLoggingObserver().emit, setStdout=0)
00580     # import logging; logging.getLogger('twisted').setLevel(logging.WARNING)
00581 
00582 if have_twisted:
00583     class LayeredTwistedIOLoop(TwistedIOLoop):
00584         """Layers a TwistedIOLoop on top of a TornadoReactor on a SelectIOLoop.
00585 
00586         This is of course silly, but is useful for testing purposes to make
00587         sure we're implementing both sides of the various interfaces
00588         correctly.  In some tests another TornadoReactor is layered on top
00589         of the whole stack.
00590         """
00591         def initialize(self):
00592             # When configured to use LayeredTwistedIOLoop we can't easily
00593             # get the next-best IOLoop implementation, so use the lowest common
00594             # denominator.
00595             self.real_io_loop = SelectIOLoop()
00596             reactor = TornadoReactor(io_loop=self.real_io_loop)
00597             super(LayeredTwistedIOLoop, self).initialize(reactor=reactor)
00598             self.add_callback(self.make_current)
00599 
00600         def close(self, all_fds=False):
00601             super(LayeredTwistedIOLoop, self).close(all_fds=all_fds)
00602             # HACK: This is the same thing that test_class.unbuildReactor does.
00603             for reader in self.reactor._internalReaders:
00604                 self.reactor.removeReader(reader)
00605                 reader.connectionLost(None)
00606             self.real_io_loop.close(all_fds=all_fds)
00607 
00608         def stop(self):
00609             # One of twisted's tests fails if I don't delay crash()
00610             # until the reactor has started, but if I move this to
00611             # TwistedIOLoop then the tests fail when I'm *not* running
00612             # tornado-on-twisted-on-tornado.  I'm clearly missing something
00613             # about the startup/crash semantics, but since stop and crash
00614             # are really only used in tests it doesn't really matter.
00615             self.reactor.callWhenRunning(self.reactor.crash)
00616 
00617 if __name__ == "__main__":
00618     unittest.main()


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Wed Sep 13 2017 03:18:20