twisted.py
Go to the documentation of this file.
00001 # Author: Ovidiu Predescu
00002 # Date: July 2011
00003 #
00004 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00005 # not use this file except in compliance with the License. You may obtain
00006 # a copy of the License at
00007 #
00008 #     http://www.apache.org/licenses/LICENSE-2.0
00009 #
00010 # Unless required by applicable law or agreed to in writing, software
00011 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013 # License for the specific language governing permissions and limitations
00014 # under the License.
00015 
00016 # Note:  This module's docs are not currently extracted automatically,
00017 # so changes must be made manually to twisted.rst
00018 # TODO: refactor doc build process to use an appropriate virtualenv
00019 """A Twisted reactor built on the Tornado IOLoop.
00020 
00021 This module lets you run applications and libraries written for
00022 Twisted in a Tornado application.  To use it, simply call `install` at
00023 the beginning of the application::
00024 
00025     import tornado.platform.twisted
00026     tornado.platform.twisted.install()
00027     from twisted.internet import reactor
00028 
00029 When the app is ready to start, call `IOLoop.instance().start()`
00030 instead of `reactor.run()`.  This will allow you to use a mixture of
00031 Twisted and Tornado code in the same process.
00032 
00033 It is also possible to create a non-global reactor by calling
00034 `tornado.platform.twisted.TornadoReactor(io_loop)`.  However, if
00035 the `IOLoop` and reactor are to be short-lived (such as those used in
00036 unit tests), additional cleanup may be required.  Specifically, it is
00037 recommended to call::
00038 
00039     reactor.fireSystemEvent('shutdown')
00040     reactor.disconnectAll()
00041 
00042 before closing the `IOLoop`.
00043 
00044 This module has been tested with Twisted versions 11.0.0, 11.1.0, and 12.0.0
00045 """
00046 
00047 from __future__ import absolute_import, division, with_statement
00048 
00049 import functools
00050 import logging
00051 import time
00052 
00053 from twisted.internet.posixbase import PosixReactorBase
00054 from twisted.internet.interfaces import \
00055     IReactorFDSet, IDelayedCall, IReactorTime
00056 from twisted.python import failure, log
00057 from twisted.internet import error
00058 
00059 from zope.interface import implements
00060 
00061 import tornado
00062 import tornado.ioloop
00063 from tornado.stack_context import NullContext
00064 from tornado.ioloop import IOLoop
00065 
00066 
00067 class TornadoDelayedCall(object):
00068     """DelayedCall object for Tornado."""
00069     # Note that zope.interface.implements is deprecated in
00070     # zope.interface 4.0, because it cannot work in python 3.  The
00071     # replacement is a class decorator, which cannot work on python
00072     # 2.5.  So when twisted supports python 3, we'll need to drop 2.5
00073     # support on this module to make it work.
00074     implements(IDelayedCall)
00075 
00076     def __init__(self, reactor, seconds, f, *args, **kw):
00077         self._reactor = reactor
00078         self._func = functools.partial(f, *args, **kw)
00079         self._time = self._reactor.seconds() + seconds
00080         self._timeout = self._reactor._io_loop.add_timeout(self._time,
00081                                                            self._called)
00082         self._active = True
00083 
00084     def _called(self):
00085         self._active = False
00086         self._reactor._removeDelayedCall(self)
00087         try:
00088             self._func()
00089         except:
00090             logging.error("_called caught exception", exc_info=True)
00091 
00092     def getTime(self):
00093         return self._time
00094 
00095     def cancel(self):
00096         self._active = False
00097         self._reactor._io_loop.remove_timeout(self._timeout)
00098         self._reactor._removeDelayedCall(self)
00099 
00100     def delay(self, seconds):
00101         self._reactor._io_loop.remove_timeout(self._timeout)
00102         self._time += seconds
00103         self._timeout = self._reactor._io_loop.add_timeout(self._time,
00104                                                            self._called)
00105 
00106     def reset(self, seconds):
00107         self._reactor._io_loop.remove_timeout(self._timeout)
00108         self._time = self._reactor.seconds() + seconds
00109         self._timeout = self._reactor._io_loop.add_timeout(self._time,
00110                                                            self._called)
00111 
00112     def active(self):
00113         return self._active
00114 
00115 
00116 class TornadoReactor(PosixReactorBase):
00117     """Twisted reactor built on the Tornado IOLoop.
00118 
00119     Since it is intented to be used in applications where the top-level
00120     event loop is ``io_loop.start()`` rather than ``reactor.run()``,
00121     it is implemented a little differently than other Twisted reactors.
00122     We override `mainLoop` instead of `doIteration` and must implement
00123     timed call functionality on top of `IOLoop.add_timeout` rather than
00124     using the implementation in `PosixReactorBase`.
00125     """
00126     implements(IReactorTime, IReactorFDSet)
00127 
00128     def __init__(self, io_loop=None):
00129         if not io_loop:
00130             io_loop = tornado.ioloop.IOLoop.instance()
00131         self._io_loop = io_loop
00132         self._readers = {}  # map of reader objects to fd
00133         self._writers = {}  # map of writer objects to fd
00134         self._fds = {}  # a map of fd to a (reader, writer) tuple
00135         self._delayedCalls = {}
00136         PosixReactorBase.__init__(self)
00137 
00138         # IOLoop.start() bypasses some of the reactor initialization.
00139         # Fire off the necessary events if they weren't already triggered
00140         # by reactor.run().
00141         def start_if_necessary():
00142             if not self._started:
00143                 self.fireSystemEvent('startup')
00144         self._io_loop.add_callback(start_if_necessary)
00145 
00146     # IReactorTime
00147     def seconds(self):
00148         return time.time()
00149 
00150     def callLater(self, seconds, f, *args, **kw):
00151         dc = TornadoDelayedCall(self, seconds, f, *args, **kw)
00152         self._delayedCalls[dc] = True
00153         return dc
00154 
00155     def getDelayedCalls(self):
00156         return [x for x in self._delayedCalls if x._active]
00157 
00158     def _removeDelayedCall(self, dc):
00159         if dc in self._delayedCalls:
00160             del self._delayedCalls[dc]
00161 
00162     # IReactorThreads
00163     def callFromThread(self, f, *args, **kw):
00164         """See `twisted.internet.interfaces.IReactorThreads.callFromThread`"""
00165         assert callable(f), "%s is not callable" % f
00166         p = functools.partial(f, *args, **kw)
00167         self._io_loop.add_callback(p)
00168 
00169     # We don't need the waker code from the super class, Tornado uses
00170     # its own waker.
00171     def installWaker(self):
00172         pass
00173 
00174     def wakeUp(self):
00175         pass
00176 
00177     # IReactorFDSet
00178     def _invoke_callback(self, fd, events):
00179         (reader, writer) = self._fds[fd]
00180         if reader:
00181             err = None
00182             if reader.fileno() == -1:
00183                 err = error.ConnectionLost()
00184             elif events & IOLoop.READ:
00185                 err = log.callWithLogger(reader, reader.doRead)
00186             if err is None and events & IOLoop.ERROR:
00187                 err = error.ConnectionLost()
00188             if err is not None:
00189                 self.removeReader(reader)
00190                 reader.readConnectionLost(failure.Failure(err))
00191         if writer:
00192             err = None
00193             if writer.fileno() == -1:
00194                 err = error.ConnectionLost()
00195             elif events & IOLoop.WRITE:
00196                 err = log.callWithLogger(writer, writer.doWrite)
00197             if err is None and events & IOLoop.ERROR:
00198                 err = error.ConnectionLost()
00199             if err is not None:
00200                 self.removeWriter(writer)
00201                 writer.writeConnectionLost(failure.Failure(err))
00202 
00203     def addReader(self, reader):
00204         """Add a FileDescriptor for notification of data available to read."""
00205         if reader in self._readers:
00206             # Don't add the reader if it's already there
00207             return
00208         fd = reader.fileno()
00209         self._readers[reader] = fd
00210         if fd in self._fds:
00211             (_, writer) = self._fds[fd]
00212             self._fds[fd] = (reader, writer)
00213             if writer:
00214                 # We already registered this fd for write events,
00215                 # update it for read events as well.
00216                 self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
00217         else:
00218             with NullContext():
00219                 self._fds[fd] = (reader, None)
00220                 self._io_loop.add_handler(fd, self._invoke_callback,
00221                                          IOLoop.READ)
00222 
00223     def addWriter(self, writer):
00224         """Add a FileDescriptor for notification of data available to write."""
00225         if writer in self._writers:
00226             return
00227         fd = writer.fileno()
00228         self._writers[writer] = fd
00229         if fd in self._fds:
00230             (reader, _) = self._fds[fd]
00231             self._fds[fd] = (reader, writer)
00232             if reader:
00233                 # We already registered this fd for read events,
00234                 # update it for write events as well.
00235                 self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
00236         else:
00237             with NullContext():
00238                 self._fds[fd] = (None, writer)
00239                 self._io_loop.add_handler(fd, self._invoke_callback,
00240                                          IOLoop.WRITE)
00241 
00242     def removeReader(self, reader):
00243         """Remove a Selectable for notification of data available to read."""
00244         if reader in self._readers:
00245             fd = self._readers.pop(reader)
00246             (_, writer) = self._fds[fd]
00247             if writer:
00248                 # We have a writer so we need to update the IOLoop for
00249                 # write events only.
00250                 self._fds[fd] = (None, writer)
00251                 self._io_loop.update_handler(fd, IOLoop.WRITE)
00252             else:
00253                 # Since we have no writer registered, we remove the
00254                 # entry from _fds and unregister the handler from the
00255                 # IOLoop
00256                 del self._fds[fd]
00257                 self._io_loop.remove_handler(fd)
00258 
00259     def removeWriter(self, writer):
00260         """Remove a Selectable for notification of data available to write."""
00261         if writer in self._writers:
00262             fd = self._writers.pop(writer)
00263             (reader, _) = self._fds[fd]
00264             if reader:
00265                 # We have a reader so we need to update the IOLoop for
00266                 # read events only.
00267                 self._fds[fd] = (reader, None)
00268                 self._io_loop.update_handler(fd, IOLoop.READ)
00269             else:
00270                 # Since we have no reader registered, we remove the
00271                 # entry from the _fds and unregister the handler from
00272                 # the IOLoop.
00273                 del self._fds[fd]
00274                 self._io_loop.remove_handler(fd)
00275 
00276     def removeAll(self):
00277         return self._removeAll(self._readers, self._writers)
00278 
00279     def getReaders(self):
00280         return self._readers.keys()
00281 
00282     def getWriters(self):
00283         return self._writers.keys()
00284 
00285     # The following functions are mainly used in twisted-style test cases;
00286     # it is expected that most users of the TornadoReactor will call
00287     # IOLoop.start() instead of Reactor.run().
00288     def stop(self):
00289         PosixReactorBase.stop(self)
00290         self._io_loop.stop()
00291 
00292     def crash(self):
00293         PosixReactorBase.crash(self)
00294         self._io_loop.stop()
00295 
00296     def doIteration(self, delay):
00297         raise NotImplementedError("doIteration")
00298 
00299     def mainLoop(self):
00300         self._io_loop.start()
00301         if self._stopped:
00302             self.fireSystemEvent("shutdown")
00303 
00304 
00305 class _TestReactor(TornadoReactor):
00306     """Subclass of TornadoReactor for use in unittests.
00307 
00308     This can't go in the test.py file because of import-order dependencies
00309     with the Twisted reactor test builder.
00310     """
00311     def __init__(self):
00312         # always use a new ioloop
00313         super(_TestReactor, self).__init__(IOLoop())
00314 
00315     def listenTCP(self, port, factory, backlog=50, interface=''):
00316         # default to localhost to avoid firewall prompts on the mac
00317         if not interface:
00318             interface = '127.0.0.1'
00319         return super(_TestReactor, self).listenTCP(
00320             port, factory, backlog=backlog, interface=interface)
00321 
00322     def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
00323         if not interface:
00324             interface = '127.0.0.1'
00325         return super(_TestReactor, self).listenUDP(
00326             port, protocol, interface=interface, maxPacketSize=maxPacketSize)
00327 
00328 
00329 def install(io_loop=None):
00330     """Install this package as the default Twisted reactor."""
00331     if not io_loop:
00332         io_loop = tornado.ioloop.IOLoop.instance()
00333     reactor = TornadoReactor(io_loop)
00334     from twisted.internet.main import installReactor
00335     installReactor(reactor)
00336     return reactor


roswww
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:53:30