twisted.py
Go to the documentation of this file.
00001 # Author: Ovidiu Predescu
00002 # Date: July 2011
00003 #
00004 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00005 # not use this file except in compliance with the License. You may obtain
00006 # a copy of the License at
00007 #
00008 #     http://www.apache.org/licenses/LICENSE-2.0
00009 #
00010 # Unless required by applicable law or agreed to in writing, software
00011 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013 # License for the specific language governing permissions and limitations
00014 # under the License.
00015 
00016 # Note:  This module's docs are not currently extracted automatically,
00017 # so changes must be made manually to twisted.rst
00018 # TODO: refactor doc build process to use an appropriate virtualenv
00019 """Bridges between the Twisted reactor and Tornado IOLoop.
00020 
00021 This module lets you run applications and libraries written for
00022 Twisted in a Tornado application.  It can be used in two modes,
00023 depending on which library's underlying event loop you want to use.
00024 
00025 This module has been tested with Twisted versions 11.0.0 and newer.
00026 
00027 Twisted on Tornado
00028 ------------------
00029 
00030 `TornadoReactor` implements the Twisted reactor interface on top of
00031 the Tornado IOLoop.  To use it, simply call `install` at the beginning
00032 of the application::
00033 
00034     import tornado.platform.twisted
00035     tornado.platform.twisted.install()
00036     from twisted.internet import reactor
00037 
00038 When the app is ready to start, call `IOLoop.instance().start()`
00039 instead of `reactor.run()`.
00040 
00041 It is also possible to create a non-global reactor by calling
00042 `tornado.platform.twisted.TornadoReactor(io_loop)`.  However, if
00043 the `IOLoop` and reactor are to be short-lived (such as those used in
00044 unit tests), additional cleanup may be required.  Specifically, it is
00045 recommended to call::
00046 
00047     reactor.fireSystemEvent('shutdown')
00048     reactor.disconnectAll()
00049 
00050 before closing the `IOLoop`.
00051 
00052 Tornado on Twisted
00053 ------------------
00054 
00055 `TwistedIOLoop` implements the Tornado IOLoop interface on top of the Twisted
00056 reactor.  Recommended usage::
00057 
00058     from tornado.platform.twisted import TwistedIOLoop
00059     from twisted.internet import reactor
00060     TwistedIOLoop().install()
00061     # Set up your tornado application as usual using `IOLoop.instance`
00062     reactor.run()
00063 
00064 `TwistedIOLoop` always uses the global Twisted reactor.
00065 """
00066 
00067 from __future__ import absolute_import, division, print_function, with_statement
00068 
00069 import datetime
00070 import functools
00071 import numbers
00072 import socket
00073 
00074 import twisted.internet.abstract
00075 from twisted.internet.posixbase import PosixReactorBase
00076 from twisted.internet.interfaces import \
00077     IReactorFDSet, IDelayedCall, IReactorTime, IReadDescriptor, IWriteDescriptor
00078 from twisted.python import failure, log
00079 from twisted.internet import error
00080 import twisted.names.cache
00081 import twisted.names.client
00082 import twisted.names.hosts
00083 import twisted.names.resolve
00084 
00085 from zope.interface import implementer
00086 
00087 from tornado.escape import utf8
00088 from tornado import gen
00089 import tornado.ioloop
00090 from tornado.log import app_log
00091 from tornado.netutil import Resolver
00092 from tornado.stack_context import NullContext, wrap
00093 from tornado.ioloop import IOLoop
00094 from tornado.util import timedelta_to_seconds
00095 
00096 
00097 @implementer(IDelayedCall)
00098 class TornadoDelayedCall(object):
00099     """DelayedCall object for Tornado."""
00100     def __init__(self, reactor, seconds, f, *args, **kw):
00101         self._reactor = reactor
00102         self._func = functools.partial(f, *args, **kw)
00103         self._time = self._reactor.seconds() + seconds
00104         self._timeout = self._reactor._io_loop.add_timeout(self._time,
00105                                                            self._called)
00106         self._active = True
00107 
00108     def _called(self):
00109         self._active = False
00110         self._reactor._removeDelayedCall(self)
00111         try:
00112             self._func()
00113         except:
00114             app_log.error("_called caught exception", exc_info=True)
00115 
00116     def getTime(self):
00117         return self._time
00118 
00119     def cancel(self):
00120         self._active = False
00121         self._reactor._io_loop.remove_timeout(self._timeout)
00122         self._reactor._removeDelayedCall(self)
00123 
00124     def delay(self, seconds):
00125         self._reactor._io_loop.remove_timeout(self._timeout)
00126         self._time += seconds
00127         self._timeout = self._reactor._io_loop.add_timeout(self._time,
00128                                                            self._called)
00129 
00130     def reset(self, seconds):
00131         self._reactor._io_loop.remove_timeout(self._timeout)
00132         self._time = self._reactor.seconds() + seconds
00133         self._timeout = self._reactor._io_loop.add_timeout(self._time,
00134                                                            self._called)
00135 
00136     def active(self):
00137         return self._active
00138 
00139 
00140 @implementer(IReactorTime, IReactorFDSet)
00141 class TornadoReactor(PosixReactorBase):
00142     """Twisted reactor built on the Tornado IOLoop.
00143 
00144     Since it is intented to be used in applications where the top-level
00145     event loop is ``io_loop.start()`` rather than ``reactor.run()``,
00146     it is implemented a little differently than other Twisted reactors.
00147     We override `mainLoop` instead of `doIteration` and must implement
00148     timed call functionality on top of `IOLoop.add_timeout` rather than
00149     using the implementation in `PosixReactorBase`.
00150     """
00151     def __init__(self, io_loop=None):
00152         if not io_loop:
00153             io_loop = tornado.ioloop.IOLoop.current()
00154         self._io_loop = io_loop
00155         self._readers = {}  # map of reader objects to fd
00156         self._writers = {}  # map of writer objects to fd
00157         self._fds = {}  # a map of fd to a (reader, writer) tuple
00158         self._delayedCalls = {}
00159         PosixReactorBase.__init__(self)
00160         self.addSystemEventTrigger('during', 'shutdown', self.crash)
00161 
00162         # IOLoop.start() bypasses some of the reactor initialization.
00163         # Fire off the necessary events if they weren't already triggered
00164         # by reactor.run().
00165         def start_if_necessary():
00166             if not self._started:
00167                 self.fireSystemEvent('startup')
00168         self._io_loop.add_callback(start_if_necessary)
00169 
00170     # IReactorTime
00171     def seconds(self):
00172         return self._io_loop.time()
00173 
00174     def callLater(self, seconds, f, *args, **kw):
00175         dc = TornadoDelayedCall(self, seconds, f, *args, **kw)
00176         self._delayedCalls[dc] = True
00177         return dc
00178 
00179     def getDelayedCalls(self):
00180         return [x for x in self._delayedCalls if x._active]
00181 
00182     def _removeDelayedCall(self, dc):
00183         if dc in self._delayedCalls:
00184             del self._delayedCalls[dc]
00185 
00186     # IReactorThreads
00187     def callFromThread(self, f, *args, **kw):
00188         """See `twisted.internet.interfaces.IReactorThreads.callFromThread`"""
00189         assert callable(f), "%s is not callable" % f
00190         with NullContext():
00191             # This NullContext is mainly for an edge case when running
00192             # TwistedIOLoop on top of a TornadoReactor.
00193             # TwistedIOLoop.add_callback uses reactor.callFromThread and
00194             # should not pick up additional StackContexts along the way.
00195             self._io_loop.add_callback(f, *args, **kw)
00196 
00197     # We don't need the waker code from the super class, Tornado uses
00198     # its own waker.
00199     def installWaker(self):
00200         pass
00201 
00202     def wakeUp(self):
00203         pass
00204 
00205     # IReactorFDSet
00206     def _invoke_callback(self, fd, events):
00207         if fd not in self._fds:
00208             return
00209         (reader, writer) = self._fds[fd]
00210         if reader:
00211             err = None
00212             if reader.fileno() == -1:
00213                 err = error.ConnectionLost()
00214             elif events & IOLoop.READ:
00215                 err = log.callWithLogger(reader, reader.doRead)
00216             if err is None and events & IOLoop.ERROR:
00217                 err = error.ConnectionLost()
00218             if err is not None:
00219                 self.removeReader(reader)
00220                 reader.readConnectionLost(failure.Failure(err))
00221         if writer:
00222             err = None
00223             if writer.fileno() == -1:
00224                 err = error.ConnectionLost()
00225             elif events & IOLoop.WRITE:
00226                 err = log.callWithLogger(writer, writer.doWrite)
00227             if err is None and events & IOLoop.ERROR:
00228                 err = error.ConnectionLost()
00229             if err is not None:
00230                 self.removeWriter(writer)
00231                 writer.writeConnectionLost(failure.Failure(err))
00232 
00233     def addReader(self, reader):
00234         """Add a FileDescriptor for notification of data available to read."""
00235         if reader in self._readers:
00236             # Don't add the reader if it's already there
00237             return
00238         fd = reader.fileno()
00239         self._readers[reader] = fd
00240         if fd in self._fds:
00241             (_, writer) = self._fds[fd]
00242             self._fds[fd] = (reader, writer)
00243             if writer:
00244                 # We already registered this fd for write events,
00245                 # update it for read events as well.
00246                 self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
00247         else:
00248             with NullContext():
00249                 self._fds[fd] = (reader, None)
00250                 self._io_loop.add_handler(fd, self._invoke_callback,
00251                                           IOLoop.READ)
00252 
00253     def addWriter(self, writer):
00254         """Add a FileDescriptor for notification of data available to write."""
00255         if writer in self._writers:
00256             return
00257         fd = writer.fileno()
00258         self._writers[writer] = fd
00259         if fd in self._fds:
00260             (reader, _) = self._fds[fd]
00261             self._fds[fd] = (reader, writer)
00262             if reader:
00263                 # We already registered this fd for read events,
00264                 # update it for write events as well.
00265                 self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
00266         else:
00267             with NullContext():
00268                 self._fds[fd] = (None, writer)
00269                 self._io_loop.add_handler(fd, self._invoke_callback,
00270                                           IOLoop.WRITE)
00271 
00272     def removeReader(self, reader):
00273         """Remove a Selectable for notification of data available to read."""
00274         if reader in self._readers:
00275             fd = self._readers.pop(reader)
00276             (_, writer) = self._fds[fd]
00277             if writer:
00278                 # We have a writer so we need to update the IOLoop for
00279                 # write events only.
00280                 self._fds[fd] = (None, writer)
00281                 self._io_loop.update_handler(fd, IOLoop.WRITE)
00282             else:
00283                 # Since we have no writer registered, we remove the
00284                 # entry from _fds and unregister the handler from the
00285                 # IOLoop
00286                 del self._fds[fd]
00287                 self._io_loop.remove_handler(fd)
00288 
00289     def removeWriter(self, writer):
00290         """Remove a Selectable for notification of data available to write."""
00291         if writer in self._writers:
00292             fd = self._writers.pop(writer)
00293             (reader, _) = self._fds[fd]
00294             if reader:
00295                 # We have a reader so we need to update the IOLoop for
00296                 # read events only.
00297                 self._fds[fd] = (reader, None)
00298                 self._io_loop.update_handler(fd, IOLoop.READ)
00299             else:
00300                 # Since we have no reader registered, we remove the
00301                 # entry from the _fds and unregister the handler from
00302                 # the IOLoop.
00303                 del self._fds[fd]
00304                 self._io_loop.remove_handler(fd)
00305 
00306     def removeAll(self):
00307         return self._removeAll(self._readers, self._writers)
00308 
00309     def getReaders(self):
00310         return self._readers.keys()
00311 
00312     def getWriters(self):
00313         return self._writers.keys()
00314 
00315     # The following functions are mainly used in twisted-style test cases;
00316     # it is expected that most users of the TornadoReactor will call
00317     # IOLoop.start() instead of Reactor.run().
00318     def stop(self):
00319         PosixReactorBase.stop(self)
00320         fire_shutdown = functools.partial(self.fireSystemEvent, "shutdown")
00321         self._io_loop.add_callback(fire_shutdown)
00322 
00323     def crash(self):
00324         PosixReactorBase.crash(self)
00325         self._io_loop.stop()
00326 
00327     def doIteration(self, delay):
00328         raise NotImplementedError("doIteration")
00329 
00330     def mainLoop(self):
00331         self._io_loop.start()
00332 
00333 
00334 class _TestReactor(TornadoReactor):
00335     """Subclass of TornadoReactor for use in unittests.
00336 
00337     This can't go in the test.py file because of import-order dependencies
00338     with the Twisted reactor test builder.
00339     """
00340     def __init__(self):
00341         # always use a new ioloop
00342         super(_TestReactor, self).__init__(IOLoop())
00343 
00344     def listenTCP(self, port, factory, backlog=50, interface=''):
00345         # default to localhost to avoid firewall prompts on the mac
00346         if not interface:
00347             interface = '127.0.0.1'
00348         return super(_TestReactor, self).listenTCP(
00349             port, factory, backlog=backlog, interface=interface)
00350 
00351     def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
00352         if not interface:
00353             interface = '127.0.0.1'
00354         return super(_TestReactor, self).listenUDP(
00355             port, protocol, interface=interface, maxPacketSize=maxPacketSize)
00356 
00357 
00358 def install(io_loop=None):
00359     """Install this package as the default Twisted reactor."""
00360     if not io_loop:
00361         io_loop = tornado.ioloop.IOLoop.current()
00362     reactor = TornadoReactor(io_loop)
00363     from twisted.internet.main import installReactor
00364     installReactor(reactor)
00365     return reactor
00366 
00367 
00368 @implementer(IReadDescriptor, IWriteDescriptor)
00369 class _FD(object):
00370     def __init__(self, fd, fileobj, handler):
00371         self.fd = fd
00372         self.fileobj = fileobj
00373         self.handler = handler
00374         self.reading = False
00375         self.writing = False
00376         self.lost = False
00377 
00378     def fileno(self):
00379         return self.fd
00380 
00381     def doRead(self):
00382         if not self.lost:
00383             self.handler(self.fileobj, tornado.ioloop.IOLoop.READ)
00384 
00385     def doWrite(self):
00386         if not self.lost:
00387             self.handler(self.fileobj, tornado.ioloop.IOLoop.WRITE)
00388 
00389     def connectionLost(self, reason):
00390         if not self.lost:
00391             self.handler(self.fileobj, tornado.ioloop.IOLoop.ERROR)
00392             self.lost = True
00393 
00394     def logPrefix(self):
00395         return ''
00396 
00397 
00398 class TwistedIOLoop(tornado.ioloop.IOLoop):
00399     """IOLoop implementation that runs on Twisted.
00400 
00401     Uses the global Twisted reactor by default.  To create multiple
00402     `TwistedIOLoops` in the same process, you must pass a unique reactor
00403     when constructing each one.
00404 
00405     Not compatible with `tornado.process.Subprocess.set_exit_callback`
00406     because the ``SIGCHLD`` handlers used by Tornado and Twisted conflict
00407     with each other.
00408     """
00409     def initialize(self, reactor=None):
00410         if reactor is None:
00411             import twisted.internet.reactor
00412             reactor = twisted.internet.reactor
00413         self.reactor = reactor
00414         self.fds = {}
00415         self.reactor.callWhenRunning(self.make_current)
00416 
00417     def close(self, all_fds=False):
00418         fds = self.fds
00419         self.reactor.removeAll()
00420         for c in self.reactor.getDelayedCalls():
00421             c.cancel()
00422         if all_fds:
00423             for fd in fds.values():
00424                 self.close_fd(fd.fileobj)
00425 
00426     def add_handler(self, fd, handler, events):
00427         if fd in self.fds:
00428             raise ValueError('fd %s added twice' % fd)
00429         fd, fileobj = self.split_fd(fd)
00430         self.fds[fd] = _FD(fd, fileobj, wrap(handler))
00431         if events & tornado.ioloop.IOLoop.READ:
00432             self.fds[fd].reading = True
00433             self.reactor.addReader(self.fds[fd])
00434         if events & tornado.ioloop.IOLoop.WRITE:
00435             self.fds[fd].writing = True
00436             self.reactor.addWriter(self.fds[fd])
00437 
00438     def update_handler(self, fd, events):
00439         fd, fileobj = self.split_fd(fd)
00440         if events & tornado.ioloop.IOLoop.READ:
00441             if not self.fds[fd].reading:
00442                 self.fds[fd].reading = True
00443                 self.reactor.addReader(self.fds[fd])
00444         else:
00445             if self.fds[fd].reading:
00446                 self.fds[fd].reading = False
00447                 self.reactor.removeReader(self.fds[fd])
00448         if events & tornado.ioloop.IOLoop.WRITE:
00449             if not self.fds[fd].writing:
00450                 self.fds[fd].writing = True
00451                 self.reactor.addWriter(self.fds[fd])
00452         else:
00453             if self.fds[fd].writing:
00454                 self.fds[fd].writing = False
00455                 self.reactor.removeWriter(self.fds[fd])
00456 
00457     def remove_handler(self, fd):
00458         fd, fileobj = self.split_fd(fd)
00459         if fd not in self.fds:
00460             return
00461         self.fds[fd].lost = True
00462         if self.fds[fd].reading:
00463             self.reactor.removeReader(self.fds[fd])
00464         if self.fds[fd].writing:
00465             self.reactor.removeWriter(self.fds[fd])
00466         del self.fds[fd]
00467 
00468     def start(self):
00469         self._setup_logging()
00470         self.reactor.run()
00471 
00472     def stop(self):
00473         self.reactor.crash()
00474 
00475     def add_timeout(self, deadline, callback, *args, **kwargs):
00476         # This method could be simplified (since tornado 4.0) by
00477         # overriding call_at instead of add_timeout, but we leave it
00478         # for now as a test of backwards-compatibility.
00479         if isinstance(deadline, numbers.Real):
00480             delay = max(deadline - self.time(), 0)
00481         elif isinstance(deadline, datetime.timedelta):
00482             delay = timedelta_to_seconds(deadline)
00483         else:
00484             raise TypeError("Unsupported deadline %r")
00485         return self.reactor.callLater(
00486             delay, self._run_callback,
00487             functools.partial(wrap(callback), *args, **kwargs))
00488 
00489     def remove_timeout(self, timeout):
00490         if timeout.active():
00491             timeout.cancel()
00492 
00493     def add_callback(self, callback, *args, **kwargs):
00494         self.reactor.callFromThread(
00495             self._run_callback,
00496             functools.partial(wrap(callback), *args, **kwargs))
00497 
00498     def add_callback_from_signal(self, callback, *args, **kwargs):
00499         self.add_callback(callback, *args, **kwargs)
00500 
00501 
00502 class TwistedResolver(Resolver):
00503     """Twisted-based asynchronous resolver.
00504 
00505     This is a non-blocking and non-threaded resolver.  It is
00506     recommended only when threads cannot be used, since it has
00507     limitations compared to the standard ``getaddrinfo``-based
00508     `~tornado.netutil.Resolver` and
00509     `~tornado.netutil.ThreadedResolver`.  Specifically, it returns at
00510     most one result, and arguments other than ``host`` and ``family``
00511     are ignored.  It may fail to resolve when ``family`` is not
00512     ``socket.AF_UNSPEC``.
00513 
00514     Requires Twisted 12.1 or newer.
00515     """
00516     def initialize(self, io_loop=None):
00517         self.io_loop = io_loop or IOLoop.current()
00518         # partial copy of twisted.names.client.createResolver, which doesn't
00519         # allow for a reactor to be passed in.
00520         self.reactor = tornado.platform.twisted.TornadoReactor(io_loop)
00521 
00522         host_resolver = twisted.names.hosts.Resolver('/etc/hosts')
00523         cache_resolver = twisted.names.cache.CacheResolver(reactor=self.reactor)
00524         real_resolver = twisted.names.client.Resolver('/etc/resolv.conf',
00525                                                       reactor=self.reactor)
00526         self.resolver = twisted.names.resolve.ResolverChain(
00527             [host_resolver, cache_resolver, real_resolver])
00528 
00529     @gen.coroutine
00530     def resolve(self, host, port, family=0):
00531         # getHostByName doesn't accept IP addresses, so if the input
00532         # looks like an IP address just return it immediately.
00533         if twisted.internet.abstract.isIPAddress(host):
00534             resolved = host
00535             resolved_family = socket.AF_INET
00536         elif twisted.internet.abstract.isIPv6Address(host):
00537             resolved = host
00538             resolved_family = socket.AF_INET6
00539         else:
00540             deferred = self.resolver.getHostByName(utf8(host))
00541             resolved = yield gen.Task(deferred.addBoth)
00542             if isinstance(resolved, failure.Failure):
00543                 resolved.raiseException()
00544             elif twisted.internet.abstract.isIPAddress(resolved):
00545                 resolved_family = socket.AF_INET
00546             elif twisted.internet.abstract.isIPv6Address(resolved):
00547                 resolved_family = socket.AF_INET6
00548             else:
00549                 resolved_family = socket.AF_UNSPEC
00550         if family != socket.AF_UNSPEC and family != resolved_family:
00551             raise Exception('Requested socket family %d but got %d' %
00552                             (family, resolved_family))
00553         result = [
00554             (resolved_family, (resolved, port)),
00555         ]
00556         raise gen.Return(result)


rosbridge_tools
Author(s): Jonathan Mace
autogenerated on Sat Dec 27 2014 11:25:59