twisted_test.py
Go to the documentation of this file.
00001 # Author: Ovidiu Predescu
00002 # Date: July 2011
00003 #
00004 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00005 # not use this file except in compliance with the License. You may obtain
00006 # a copy of the License at
00007 #
00008 #     http://www.apache.org/licenses/LICENSE-2.0
00009 #
00010 # Unless required by applicable law or agreed to in writing, software
00011 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013 # License for the specific language governing permissions and limitations
00014 # under the License.
00015 
00016 """
00017 Unittest for the twisted-style reactor.
00018 """
00019 
00020 from __future__ import absolute_import, division, with_statement
00021 
00022 import os
00023 import signal
00024 import thread
00025 import threading
00026 import unittest
00027 
00028 try:
00029     import fcntl
00030     import twisted
00031     from twisted.internet.defer import Deferred
00032     from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor
00033     from twisted.internet.protocol import Protocol
00034     from twisted.web.client import Agent
00035     from twisted.web.resource import Resource
00036     from twisted.web.server import Site
00037     from twisted.python import log
00038     from tornado.platform.twisted import TornadoReactor
00039     from zope.interface import implements
00040 except ImportError:
00041     fcntl = None
00042     twisted = None
00043     IReadDescriptor = IWriteDescriptor = None
00044 
00045     def implements(f):
00046         pass
00047 
00048 from tornado.httpclient import AsyncHTTPClient
00049 from tornado.ioloop import IOLoop
00050 from tornado.platform.auto import set_close_exec
00051 from tornado.testing import get_unused_port
00052 from tornado.util import import_object
00053 from tornado.web import RequestHandler, Application
00054 
00055 
00056 def save_signal_handlers():
00057     saved = {}
00058     for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGCHLD]:
00059         saved[sig] = signal.getsignal(sig)
00060     assert "twisted" not in repr(saved), repr(saved)
00061     return saved
00062 
00063 
00064 def restore_signal_handlers(saved):
00065     for sig, handler in saved.iteritems():
00066         signal.signal(sig, handler)
00067 
00068 
00069 class ReactorTestCase(unittest.TestCase):
00070     def setUp(self):
00071         self._saved_signals = save_signal_handlers()
00072         self._io_loop = IOLoop()
00073         self._reactor = TornadoReactor(self._io_loop)
00074 
00075     def tearDown(self):
00076         self._io_loop.close(all_fds=True)
00077         restore_signal_handlers(self._saved_signals)
00078 
00079 
00080 class ReactorWhenRunningTest(ReactorTestCase):
00081     def test_whenRunning(self):
00082         self._whenRunningCalled = False
00083         self._anotherWhenRunningCalled = False
00084         self._reactor.callWhenRunning(self.whenRunningCallback)
00085         self._reactor.run()
00086         self.assertTrue(self._whenRunningCalled)
00087         self.assertTrue(self._anotherWhenRunningCalled)
00088 
00089     def whenRunningCallback(self):
00090         self._whenRunningCalled = True
00091         self._reactor.callWhenRunning(self.anotherWhenRunningCallback)
00092         self._reactor.stop()
00093 
00094     def anotherWhenRunningCallback(self):
00095         self._anotherWhenRunningCalled = True
00096 
00097 
00098 class ReactorCallLaterTest(ReactorTestCase):
00099     def test_callLater(self):
00100         self._laterCalled = False
00101         self._now = self._reactor.seconds()
00102         self._timeout = 0.001
00103         dc = self._reactor.callLater(self._timeout, self.callLaterCallback)
00104         self.assertEqual(self._reactor.getDelayedCalls(), [dc])
00105         self._reactor.run()
00106         self.assertTrue(self._laterCalled)
00107         self.assertTrue(self._called - self._now > self._timeout)
00108         self.assertEqual(self._reactor.getDelayedCalls(), [])
00109 
00110     def callLaterCallback(self):
00111         self._laterCalled = True
00112         self._called = self._reactor.seconds()
00113         self._reactor.stop()
00114 
00115 
00116 class ReactorTwoCallLaterTest(ReactorTestCase):
00117     def test_callLater(self):
00118         self._later1Called = False
00119         self._later2Called = False
00120         self._now = self._reactor.seconds()
00121         self._timeout1 = 0.0005
00122         dc1 = self._reactor.callLater(self._timeout1, self.callLaterCallback1)
00123         self._timeout2 = 0.001
00124         dc2 = self._reactor.callLater(self._timeout2, self.callLaterCallback2)
00125         self.assertTrue(self._reactor.getDelayedCalls() == [dc1, dc2] or
00126                         self._reactor.getDelayedCalls() == [dc2, dc1])
00127         self._reactor.run()
00128         self.assertTrue(self._later1Called)
00129         self.assertTrue(self._later2Called)
00130         self.assertTrue(self._called1 - self._now > self._timeout1)
00131         self.assertTrue(self._called2 - self._now > self._timeout2)
00132         self.assertEqual(self._reactor.getDelayedCalls(), [])
00133 
00134     def callLaterCallback1(self):
00135         self._later1Called = True
00136         self._called1 = self._reactor.seconds()
00137 
00138     def callLaterCallback2(self):
00139         self._later2Called = True
00140         self._called2 = self._reactor.seconds()
00141         self._reactor.stop()
00142 
00143 
00144 class ReactorCallFromThreadTest(ReactorTestCase):
00145     def setUp(self):
00146         super(ReactorCallFromThreadTest, self).setUp()
00147         self._mainThread = thread.get_ident()
00148 
00149     def tearDown(self):
00150         self._thread.join()
00151         super(ReactorCallFromThreadTest, self).tearDown()
00152 
00153     def _newThreadRun(self):
00154         self.assertNotEqual(self._mainThread, thread.get_ident())
00155         if hasattr(self._thread, 'ident'):  # new in python 2.6
00156             self.assertEqual(self._thread.ident, thread.get_ident())
00157         self._reactor.callFromThread(self._fnCalledFromThread)
00158 
00159     def _fnCalledFromThread(self):
00160         self.assertEqual(self._mainThread, thread.get_ident())
00161         self._reactor.stop()
00162 
00163     def _whenRunningCallback(self):
00164         self._thread = threading.Thread(target=self._newThreadRun)
00165         self._thread.start()
00166 
00167     def testCallFromThread(self):
00168         self._reactor.callWhenRunning(self._whenRunningCallback)
00169         self._reactor.run()
00170 
00171 
00172 class ReactorCallInThread(ReactorTestCase):
00173     def setUp(self):
00174         super(ReactorCallInThread, self).setUp()
00175         self._mainThread = thread.get_ident()
00176 
00177     def _fnCalledInThread(self, *args, **kwargs):
00178         self.assertNotEqual(thread.get_ident(), self._mainThread)
00179         self._reactor.callFromThread(lambda: self._reactor.stop())
00180 
00181     def _whenRunningCallback(self):
00182         self._reactor.callInThread(self._fnCalledInThread)
00183 
00184     def testCallInThread(self):
00185         self._reactor.callWhenRunning(self._whenRunningCallback)
00186         self._reactor.run()
00187 
00188 
00189 class Reader:
00190     implements(IReadDescriptor)
00191 
00192     def __init__(self, fd, callback):
00193         self._fd = fd
00194         self._callback = callback
00195 
00196     def logPrefix(self):
00197         return "Reader"
00198 
00199     def close(self):
00200         self._fd.close()
00201 
00202     def fileno(self):
00203         return self._fd.fileno()
00204 
00205     def connectionLost(self, reason):
00206         self.close()
00207 
00208     def doRead(self):
00209         self._callback(self._fd)
00210 
00211 
00212 class Writer:
00213     implements(IWriteDescriptor)
00214 
00215     def __init__(self, fd, callback):
00216         self._fd = fd
00217         self._callback = callback
00218 
00219     def logPrefix(self):
00220         return "Writer"
00221 
00222     def close(self):
00223         self._fd.close()
00224 
00225     def fileno(self):
00226         return self._fd.fileno()
00227 
00228     def connectionLost(self, reason):
00229         self.close()
00230 
00231     def doWrite(self):
00232         self._callback(self._fd)
00233 
00234 
00235 class ReactorReaderWriterTest(ReactorTestCase):
00236     def _set_nonblocking(self, fd):
00237         flags = fcntl.fcntl(fd, fcntl.F_GETFL)
00238         fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
00239 
00240     def setUp(self):
00241         super(ReactorReaderWriterTest, self).setUp()
00242         r, w = os.pipe()
00243         self._set_nonblocking(r)
00244         self._set_nonblocking(w)
00245         set_close_exec(r)
00246         set_close_exec(w)
00247         self._p1 = os.fdopen(r, "rb", 0)
00248         self._p2 = os.fdopen(w, "wb", 0)
00249 
00250     def tearDown(self):
00251         super(ReactorReaderWriterTest, self).tearDown()
00252         self._p1.close()
00253         self._p2.close()
00254 
00255     def _testReadWrite(self):
00256         """
00257         In this test the writer writes an 'x' to its fd. The reader
00258         reads it, check the value and ends the test.
00259         """
00260         self.shouldWrite = True
00261 
00262         def checkReadInput(fd):
00263             self.assertEquals(fd.read(), 'x')
00264             self._reactor.stop()
00265 
00266         def writeOnce(fd):
00267             if self.shouldWrite:
00268                 self.shouldWrite = False
00269                 fd.write('x')
00270         self._reader = Reader(self._p1, checkReadInput)
00271         self._writer = Writer(self._p2, writeOnce)
00272 
00273         self._reactor.addWriter(self._writer)
00274 
00275         # Test that adding the reader twice adds it only once to
00276         # IOLoop.
00277         self._reactor.addReader(self._reader)
00278         self._reactor.addReader(self._reader)
00279 
00280     def testReadWrite(self):
00281         self._reactor.callWhenRunning(self._testReadWrite)
00282         self._reactor.run()
00283 
00284     def _testNoWriter(self):
00285         """
00286         In this test we have no writer. Make sure the reader doesn't
00287         read anything.
00288         """
00289         def checkReadInput(fd):
00290             self.fail("Must not be called.")
00291 
00292         def stopTest():
00293             # Close the writer here since the IOLoop doesn't know
00294             # about it.
00295             self._writer.close()
00296             self._reactor.stop()
00297         self._reader = Reader(self._p1, checkReadInput)
00298 
00299         # We create a writer, but it should never be invoked.
00300         self._writer = Writer(self._p2, lambda fd: fd.write('x'))
00301 
00302         # Test that adding and removing the writer leaves us with no writer.
00303         self._reactor.addWriter(self._writer)
00304         self._reactor.removeWriter(self._writer)
00305 
00306         # Test that adding and removing the reader doesn't cause
00307         # unintended effects.
00308         self._reactor.addReader(self._reader)
00309 
00310         # Wake up after a moment and stop the test
00311         self._reactor.callLater(0.001, stopTest)
00312 
00313     def testNoWriter(self):
00314         self._reactor.callWhenRunning(self._testNoWriter)
00315         self._reactor.run()
00316 
00317 # Test various combinations of twisted and tornado http servers,
00318 # http clients, and event loop interfaces.
00319 
00320 
00321 class CompatibilityTests(unittest.TestCase):
00322     def setUp(self):
00323         self.saved_signals = save_signal_handlers()
00324         self.io_loop = IOLoop()
00325         self.reactor = TornadoReactor(self.io_loop)
00326 
00327     def tearDown(self):
00328         self.reactor.disconnectAll()
00329         self.io_loop.close(all_fds=True)
00330         restore_signal_handlers(self.saved_signals)
00331 
00332     def start_twisted_server(self):
00333         class HelloResource(Resource):
00334             isLeaf = True
00335 
00336             def render_GET(self, request):
00337                 return "Hello from twisted!"
00338         site = Site(HelloResource())
00339         self.twisted_port = get_unused_port()
00340         self.reactor.listenTCP(self.twisted_port, site, interface='127.0.0.1')
00341 
00342     def start_tornado_server(self):
00343         class HelloHandler(RequestHandler):
00344             def get(self):
00345                 self.write("Hello from tornado!")
00346         app = Application([('/', HelloHandler)],
00347                           log_function=lambda x: None)
00348         self.tornado_port = get_unused_port()
00349         app.listen(self.tornado_port, address='127.0.0.1', io_loop=self.io_loop)
00350 
00351     def run_ioloop(self):
00352         self.stop_loop = self.io_loop.stop
00353         self.io_loop.start()
00354         self.reactor.fireSystemEvent('shutdown')
00355 
00356     def run_reactor(self):
00357         self.stop_loop = self.reactor.stop
00358         self.stop = self.reactor.stop
00359         self.reactor.run()
00360 
00361     def tornado_fetch(self, url, runner):
00362         responses = []
00363         client = AsyncHTTPClient(self.io_loop)
00364 
00365         def callback(response):
00366             responses.append(response)
00367             self.stop_loop()
00368         client.fetch(url, callback=callback)
00369         runner()
00370         self.assertEqual(len(responses), 1)
00371         responses[0].rethrow()
00372         return responses[0]
00373 
00374     def twisted_fetch(self, url, runner):
00375         # http://twistedmatrix.com/documents/current/web/howto/client.html
00376         chunks = []
00377         client = Agent(self.reactor)
00378         d = client.request('GET', url)
00379 
00380         class Accumulator(Protocol):
00381             def __init__(self, finished):
00382                 self.finished = finished
00383 
00384             def dataReceived(self, data):
00385                 chunks.append(data)
00386 
00387             def connectionLost(self, reason):
00388                 self.finished.callback(None)
00389 
00390         def callback(response):
00391             finished = Deferred()
00392             response.deliverBody(Accumulator(finished))
00393             return finished
00394         d.addCallback(callback)
00395 
00396         def shutdown(ignored):
00397             self.stop_loop()
00398         d.addBoth(shutdown)
00399         runner()
00400         self.assertTrue(chunks)
00401         return ''.join(chunks)
00402 
00403     def testTwistedServerTornadoClientIOLoop(self):
00404         self.start_twisted_server()
00405         response = self.tornado_fetch(
00406             'http://localhost:%d' % self.twisted_port, self.run_ioloop)
00407         self.assertEqual(response.body, 'Hello from twisted!')
00408 
00409     def testTwistedServerTornadoClientReactor(self):
00410         self.start_twisted_server()
00411         response = self.tornado_fetch(
00412             'http://localhost:%d' % self.twisted_port, self.run_reactor)
00413         self.assertEqual(response.body, 'Hello from twisted!')
00414 
00415     def testTornadoServerTwistedClientIOLoop(self):
00416         self.start_tornado_server()
00417         response = self.twisted_fetch(
00418             'http://localhost:%d' % self.tornado_port, self.run_ioloop)
00419         self.assertEqual(response, 'Hello from tornado!')
00420 
00421     def testTornadoServerTwistedClientReactor(self):
00422         self.start_tornado_server()
00423         response = self.twisted_fetch(
00424             'http://localhost:%d' % self.tornado_port, self.run_reactor)
00425         self.assertEqual(response, 'Hello from tornado!')
00426 
00427 
00428 if twisted is None:
00429     del ReactorWhenRunningTest
00430     del ReactorCallLaterTest
00431     del ReactorTwoCallLaterTest
00432     del ReactorCallFromThreadTest
00433     del ReactorCallInThread
00434     del ReactorReaderWriterTest
00435     del CompatibilityTests
00436 else:
00437     # Import and run as much of twisted's test suite as possible.
00438     # This is unfortunately rather dependent on implementation details,
00439     # but there doesn't appear to be a clean all-in-one conformance test
00440     # suite for reactors.
00441     #
00442     # This is a list of all test suites using the ReactorBuilder
00443     # available in Twisted 11.0.0 and 11.1.0 (and a blacklist of
00444     # specific test methods to be disabled).
00445     twisted_tests = {
00446         'twisted.internet.test.test_core.ObjectModelIntegrationTest': [],
00447         'twisted.internet.test.test_core.SystemEventTestsBuilder': [
00448             'test_iterate',  # deliberately not supported
00449             ],
00450         'twisted.internet.test.test_fdset.ReactorFDSetTestsBuilder': [
00451             "test_lostFileDescriptor",  # incompatible with epoll and kqueue
00452             ],
00453         'twisted.internet.test.test_process.ProcessTestsBuilder': [
00454             # Doesn't work on python 2.5
00455             'test_systemCallUninterruptedByChildExit',
00456             # Doesn't clean up its temp files
00457             'test_shebang',
00458             ],
00459         # Process tests appear to work on OSX 10.7, but not 10.6
00460         #'twisted.internet.test.test_process.PTYProcessTestsBuilder': [
00461         #    'test_systemCallUninterruptedByChildExit',
00462         #    ],
00463         'twisted.internet.test.test_tcp.TCPClientTestsBuilder': [
00464             'test_badContext',  # ssl-related; see also SSLClientTestsMixin
00465             ],
00466         'twisted.internet.test.test_tcp.TCPPortTestsBuilder': [
00467             # These use link-local addresses and cause firewall prompts on mac
00468             'test_buildProtocolIPv6AddressScopeID',
00469             'test_portGetHostOnIPv6ScopeID',
00470             'test_serverGetHostOnIPv6ScopeID',
00471             'test_serverGetPeerOnIPv6ScopeID',
00472             ],
00473         'twisted.internet.test.test_tcp.TCPConnectionTestsBuilder': [],
00474         'twisted.internet.test.test_tcp.WriteSequenceTests': [],
00475         'twisted.internet.test.test_tcp.AbortConnectionTestCase': [],
00476         'twisted.internet.test.test_threads.ThreadTestsBuilder': [],
00477         'twisted.internet.test.test_time.TimeTestsBuilder': [],
00478         # Extra third-party dependencies (pyOpenSSL)
00479         #'twisted.internet.test.test_tls.SSLClientTestsMixin': [],
00480         'twisted.internet.test.test_udp.UDPServerTestsBuilder': [],
00481         'twisted.internet.test.test_unix.UNIXTestsBuilder': [
00482             # Platform-specific.  These tests would be skipped automatically
00483             # if we were running twisted's own test runner.
00484             'test_connectToLinuxAbstractNamespace',
00485             'test_listenOnLinuxAbstractNamespace',
00486             ],
00487         'twisted.internet.test.test_unix.UNIXDatagramTestsBuilder': [
00488             'test_listenOnLinuxAbstractNamespace',
00489             ],
00490         'twisted.internet.test.test_unix.UNIXPortTestsBuilder': [],
00491         }
00492     for test_name, blacklist in twisted_tests.iteritems():
00493         try:
00494             test_class = import_object(test_name)
00495         except (ImportError, AttributeError):
00496             continue
00497         for test_func in blacklist:
00498             if hasattr(test_class, test_func):
00499                 # The test_func may be defined in a mixin, so clobber
00500                 # it instead of delattr()
00501                 setattr(test_class, test_func, lambda self: None)
00502 
00503         def make_test_subclass(test_class):
00504             class TornadoTest(test_class):
00505                 _reactors = ["tornado.platform.twisted._TestReactor"]
00506 
00507                 def buildReactor(self):
00508                     self.__saved_signals = save_signal_handlers()
00509                     return test_class.buildReactor(self)
00510 
00511                 def unbuildReactor(self, reactor):
00512                     test_class.unbuildReactor(self, reactor)
00513                     # Clean up file descriptors (especially epoll/kqueue
00514                     # objects) eagerly instead of leaving them for the
00515                     # GC.  Unfortunately we can't do this in reactor.stop
00516                     # since twisted expects to be able to unregister
00517                     # connections in a post-shutdown hook.
00518                     reactor._io_loop.close(all_fds=True)
00519                     restore_signal_handlers(self.__saved_signals)
00520 
00521             TornadoTest.__name__ = test_class.__name__
00522             return TornadoTest
00523         test_subclass = make_test_subclass(test_class)
00524         globals().update(test_subclass.makeTestCaseClasses())
00525 
00526     # Since we're not using twisted's test runner, it's tricky to get
00527     # logging set up well.  Most of the time it's easiest to just
00528     # leave it turned off, but while working on these tests you may want
00529     # to uncomment one of the other lines instead.
00530     log.defaultObserver.stop()
00531     #import sys; log.startLogging(sys.stderr, setStdout=0)
00532     #log.startLoggingWithObserver(log.PythonLoggingObserver().emit, setStdout=0)
00533 
00534 if __name__ == "__main__":
00535     unittest.main()


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:53:55