00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 """A Twisted reactor built on the Tornado IOLoop.
00020
00021 This module lets you run applications and libraries written for
00022 Twisted in a Tornado application. To use it, simply call `install` at
00023 the beginning of the application::
00024
00025 import tornado.platform.twisted
00026 tornado.platform.twisted.install()
00027 from twisted.internet import reactor
00028
00029 When the app is ready to start, call `IOLoop.instance().start()`
00030 instead of `reactor.run()`. This will allow you to use a mixture of
00031 Twisted and Tornado code in the same process.
00032
00033 It is also possible to create a non-global reactor by calling
00034 `tornado.platform.twisted.TornadoReactor(io_loop)`. However, if
00035 the `IOLoop` and reactor are to be short-lived (such as those used in
00036 unit tests), additional cleanup may be required. Specifically, it is
00037 recommended to call::
00038
00039 reactor.fireSystemEvent('shutdown')
00040 reactor.disconnectAll()
00041
00042 before closing the `IOLoop`.
00043
00044 This module has been tested with Twisted versions 11.0.0, 11.1.0, and 12.0.0
00045 """
00046
00047 from __future__ import absolute_import, division, with_statement
00048
00049 import functools
00050 import logging
00051 import time
00052
00053 from twisted.internet.posixbase import PosixReactorBase
00054 from twisted.internet.interfaces import \
00055 IReactorFDSet, IDelayedCall, IReactorTime
00056 from twisted.python import failure, log
00057 from twisted.internet import error
00058
00059 from zope.interface import implements
00060
00061 import tornado
00062 import tornado.ioloop
00063 from tornado.stack_context import NullContext
00064 from tornado.ioloop import IOLoop
00065
00066
00067 class TornadoDelayedCall(object):
00068 """DelayedCall object for Tornado."""
00069
00070
00071
00072
00073
00074 implements(IDelayedCall)
00075
00076 def __init__(self, reactor, seconds, f, *args, **kw):
00077 self._reactor = reactor
00078 self._func = functools.partial(f, *args, **kw)
00079 self._time = self._reactor.seconds() + seconds
00080 self._timeout = self._reactor._io_loop.add_timeout(self._time,
00081 self._called)
00082 self._active = True
00083
00084 def _called(self):
00085 self._active = False
00086 self._reactor._removeDelayedCall(self)
00087 try:
00088 self._func()
00089 except:
00090 logging.error("_called caught exception", exc_info=True)
00091
00092 def getTime(self):
00093 return self._time
00094
00095 def cancel(self):
00096 self._active = False
00097 self._reactor._io_loop.remove_timeout(self._timeout)
00098 self._reactor._removeDelayedCall(self)
00099
00100 def delay(self, seconds):
00101 self._reactor._io_loop.remove_timeout(self._timeout)
00102 self._time += seconds
00103 self._timeout = self._reactor._io_loop.add_timeout(self._time,
00104 self._called)
00105
00106 def reset(self, seconds):
00107 self._reactor._io_loop.remove_timeout(self._timeout)
00108 self._time = self._reactor.seconds() + seconds
00109 self._timeout = self._reactor._io_loop.add_timeout(self._time,
00110 self._called)
00111
00112 def active(self):
00113 return self._active
00114
00115
00116 class TornadoReactor(PosixReactorBase):
00117 """Twisted reactor built on the Tornado IOLoop.
00118
00119 Since it is intented to be used in applications where the top-level
00120 event loop is ``io_loop.start()`` rather than ``reactor.run()``,
00121 it is implemented a little differently than other Twisted reactors.
00122 We override `mainLoop` instead of `doIteration` and must implement
00123 timed call functionality on top of `IOLoop.add_timeout` rather than
00124 using the implementation in `PosixReactorBase`.
00125 """
00126 implements(IReactorTime, IReactorFDSet)
00127
00128 def __init__(self, io_loop=None):
00129 if not io_loop:
00130 io_loop = tornado.ioloop.IOLoop.instance()
00131 self._io_loop = io_loop
00132 self._readers = {}
00133 self._writers = {}
00134 self._fds = {}
00135 self._delayedCalls = {}
00136 PosixReactorBase.__init__(self)
00137
00138
00139
00140
00141 def start_if_necessary():
00142 if not self._started:
00143 self.fireSystemEvent('startup')
00144 self._io_loop.add_callback(start_if_necessary)
00145
00146
00147 def seconds(self):
00148 return time.time()
00149
00150 def callLater(self, seconds, f, *args, **kw):
00151 dc = TornadoDelayedCall(self, seconds, f, *args, **kw)
00152 self._delayedCalls[dc] = True
00153 return dc
00154
00155 def getDelayedCalls(self):
00156 return [x for x in self._delayedCalls if x._active]
00157
00158 def _removeDelayedCall(self, dc):
00159 if dc in self._delayedCalls:
00160 del self._delayedCalls[dc]
00161
00162
00163 def callFromThread(self, f, *args, **kw):
00164 """See `twisted.internet.interfaces.IReactorThreads.callFromThread`"""
00165 assert callable(f), "%s is not callable" % f
00166 p = functools.partial(f, *args, **kw)
00167 self._io_loop.add_callback(p)
00168
00169
00170
00171 def installWaker(self):
00172 pass
00173
00174 def wakeUp(self):
00175 pass
00176
00177
00178 def _invoke_callback(self, fd, events):
00179 (reader, writer) = self._fds[fd]
00180 if reader:
00181 err = None
00182 if reader.fileno() == -1:
00183 err = error.ConnectionLost()
00184 elif events & IOLoop.READ:
00185 err = log.callWithLogger(reader, reader.doRead)
00186 if err is None and events & IOLoop.ERROR:
00187 err = error.ConnectionLost()
00188 if err is not None:
00189 self.removeReader(reader)
00190 reader.readConnectionLost(failure.Failure(err))
00191 if writer:
00192 err = None
00193 if writer.fileno() == -1:
00194 err = error.ConnectionLost()
00195 elif events & IOLoop.WRITE:
00196 err = log.callWithLogger(writer, writer.doWrite)
00197 if err is None and events & IOLoop.ERROR:
00198 err = error.ConnectionLost()
00199 if err is not None:
00200 self.removeWriter(writer)
00201 writer.writeConnectionLost(failure.Failure(err))
00202
00203 def addReader(self, reader):
00204 """Add a FileDescriptor for notification of data available to read."""
00205 if reader in self._readers:
00206
00207 return
00208 fd = reader.fileno()
00209 self._readers[reader] = fd
00210 if fd in self._fds:
00211 (_, writer) = self._fds[fd]
00212 self._fds[fd] = (reader, writer)
00213 if writer:
00214
00215
00216 self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
00217 else:
00218 with NullContext():
00219 self._fds[fd] = (reader, None)
00220 self._io_loop.add_handler(fd, self._invoke_callback,
00221 IOLoop.READ)
00222
00223 def addWriter(self, writer):
00224 """Add a FileDescriptor for notification of data available to write."""
00225 if writer in self._writers:
00226 return
00227 fd = writer.fileno()
00228 self._writers[writer] = fd
00229 if fd in self._fds:
00230 (reader, _) = self._fds[fd]
00231 self._fds[fd] = (reader, writer)
00232 if reader:
00233
00234
00235 self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
00236 else:
00237 with NullContext():
00238 self._fds[fd] = (None, writer)
00239 self._io_loop.add_handler(fd, self._invoke_callback,
00240 IOLoop.WRITE)
00241
00242 def removeReader(self, reader):
00243 """Remove a Selectable for notification of data available to read."""
00244 if reader in self._readers:
00245 fd = self._readers.pop(reader)
00246 (_, writer) = self._fds[fd]
00247 if writer:
00248
00249
00250 self._fds[fd] = (None, writer)
00251 self._io_loop.update_handler(fd, IOLoop.WRITE)
00252 else:
00253
00254
00255
00256 del self._fds[fd]
00257 self._io_loop.remove_handler(fd)
00258
00259 def removeWriter(self, writer):
00260 """Remove a Selectable for notification of data available to write."""
00261 if writer in self._writers:
00262 fd = self._writers.pop(writer)
00263 (reader, _) = self._fds[fd]
00264 if reader:
00265
00266
00267 self._fds[fd] = (reader, None)
00268 self._io_loop.update_handler(fd, IOLoop.READ)
00269 else:
00270
00271
00272
00273 del self._fds[fd]
00274 self._io_loop.remove_handler(fd)
00275
00276 def removeAll(self):
00277 return self._removeAll(self._readers, self._writers)
00278
00279 def getReaders(self):
00280 return self._readers.keys()
00281
00282 def getWriters(self):
00283 return self._writers.keys()
00284
00285
00286
00287
00288 def stop(self):
00289 PosixReactorBase.stop(self)
00290 self._io_loop.stop()
00291
00292 def crash(self):
00293 PosixReactorBase.crash(self)
00294 self._io_loop.stop()
00295
00296 def doIteration(self, delay):
00297 raise NotImplementedError("doIteration")
00298
00299 def mainLoop(self):
00300 self._io_loop.start()
00301 if self._stopped:
00302 self.fireSystemEvent("shutdown")
00303
00304
00305 class _TestReactor(TornadoReactor):
00306 """Subclass of TornadoReactor for use in unittests.
00307
00308 This can't go in the test.py file because of import-order dependencies
00309 with the Twisted reactor test builder.
00310 """
00311 def __init__(self):
00312
00313 super(_TestReactor, self).__init__(IOLoop())
00314
00315 def listenTCP(self, port, factory, backlog=50, interface=''):
00316
00317 if not interface:
00318 interface = '127.0.0.1'
00319 return super(_TestReactor, self).listenTCP(
00320 port, factory, backlog=backlog, interface=interface)
00321
00322 def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
00323 if not interface:
00324 interface = '127.0.0.1'
00325 return super(_TestReactor, self).listenUDP(
00326 port, protocol, interface=interface, maxPacketSize=maxPacketSize)
00327
00328
00329 def install(io_loop=None):
00330 """Install this package as the default Twisted reactor."""
00331 if not io_loop:
00332 io_loop = tornado.ioloop.IOLoop.instance()
00333 reactor = TornadoReactor(io_loop)
00334 from twisted.internet.main import installReactor
00335 installReactor(reactor)
00336 return reactor