00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 """Bridges between the Twisted reactor and Tornado IOLoop.
00020
00021 This module lets you run applications and libraries written for
00022 Twisted in a Tornado application. It can be used in two modes,
00023 depending on which library's underlying event loop you want to use.
00024
00025 This module has been tested with Twisted versions 11.0.0 and newer.
00026
00027 Twisted on Tornado
00028 ------------------
00029
00030 `TornadoReactor` implements the Twisted reactor interface on top of
00031 the Tornado IOLoop. To use it, simply call `install` at the beginning
00032 of the application::
00033
00034 import tornado.platform.twisted
00035 tornado.platform.twisted.install()
00036 from twisted.internet import reactor
00037
00038 When the app is ready to start, call `IOLoop.instance().start()`
00039 instead of `reactor.run()`.
00040
00041 It is also possible to create a non-global reactor by calling
00042 `tornado.platform.twisted.TornadoReactor(io_loop)`. However, if
00043 the `IOLoop` and reactor are to be short-lived (such as those used in
00044 unit tests), additional cleanup may be required. Specifically, it is
00045 recommended to call::
00046
00047 reactor.fireSystemEvent('shutdown')
00048 reactor.disconnectAll()
00049
00050 before closing the `IOLoop`.
00051
00052 Tornado on Twisted
00053 ------------------
00054
00055 `TwistedIOLoop` implements the Tornado IOLoop interface on top of the Twisted
00056 reactor. Recommended usage::
00057
00058 from tornado.platform.twisted import TwistedIOLoop
00059 from twisted.internet import reactor
00060 TwistedIOLoop().install()
00061 # Set up your tornado application as usual using `IOLoop.instance`
00062 reactor.run()
00063
00064 `TwistedIOLoop` always uses the global Twisted reactor.
00065 """
00066
00067 from __future__ import absolute_import, division, print_function, with_statement
00068
00069 import datetime
00070 import functools
00071 import numbers
00072 import socket
00073
00074 import twisted.internet.abstract
00075 from twisted.internet.posixbase import PosixReactorBase
00076 from twisted.internet.interfaces import \
00077 IReactorFDSet, IDelayedCall, IReactorTime, IReadDescriptor, IWriteDescriptor
00078 from twisted.python import failure, log
00079 from twisted.internet import error
00080 import twisted.names.cache
00081 import twisted.names.client
00082 import twisted.names.hosts
00083 import twisted.names.resolve
00084
00085 from zope.interface import implementer
00086
00087 from tornado.escape import utf8
00088 from tornado import gen
00089 import tornado.ioloop
00090 from tornado.log import app_log
00091 from tornado.netutil import Resolver
00092 from tornado.stack_context import NullContext, wrap
00093 from tornado.ioloop import IOLoop
00094 from tornado.util import timedelta_to_seconds
00095
00096
00097 @implementer(IDelayedCall)
00098 class TornadoDelayedCall(object):
00099 """DelayedCall object for Tornado."""
00100 def __init__(self, reactor, seconds, f, *args, **kw):
00101 self._reactor = reactor
00102 self._func = functools.partial(f, *args, **kw)
00103 self._time = self._reactor.seconds() + seconds
00104 self._timeout = self._reactor._io_loop.add_timeout(self._time,
00105 self._called)
00106 self._active = True
00107
00108 def _called(self):
00109 self._active = False
00110 self._reactor._removeDelayedCall(self)
00111 try:
00112 self._func()
00113 except:
00114 app_log.error("_called caught exception", exc_info=True)
00115
00116 def getTime(self):
00117 return self._time
00118
00119 def cancel(self):
00120 self._active = False
00121 self._reactor._io_loop.remove_timeout(self._timeout)
00122 self._reactor._removeDelayedCall(self)
00123
00124 def delay(self, seconds):
00125 self._reactor._io_loop.remove_timeout(self._timeout)
00126 self._time += seconds
00127 self._timeout = self._reactor._io_loop.add_timeout(self._time,
00128 self._called)
00129
00130 def reset(self, seconds):
00131 self._reactor._io_loop.remove_timeout(self._timeout)
00132 self._time = self._reactor.seconds() + seconds
00133 self._timeout = self._reactor._io_loop.add_timeout(self._time,
00134 self._called)
00135
00136 def active(self):
00137 return self._active
00138
00139
00140 @implementer(IReactorTime, IReactorFDSet)
00141 class TornadoReactor(PosixReactorBase):
00142 """Twisted reactor built on the Tornado IOLoop.
00143
00144 Since it is intented to be used in applications where the top-level
00145 event loop is ``io_loop.start()`` rather than ``reactor.run()``,
00146 it is implemented a little differently than other Twisted reactors.
00147 We override `mainLoop` instead of `doIteration` and must implement
00148 timed call functionality on top of `IOLoop.add_timeout` rather than
00149 using the implementation in `PosixReactorBase`.
00150 """
00151 def __init__(self, io_loop=None):
00152 if not io_loop:
00153 io_loop = tornado.ioloop.IOLoop.current()
00154 self._io_loop = io_loop
00155 self._readers = {}
00156 self._writers = {}
00157 self._fds = {}
00158 self._delayedCalls = {}
00159 PosixReactorBase.__init__(self)
00160 self.addSystemEventTrigger('during', 'shutdown', self.crash)
00161
00162
00163
00164
00165 def start_if_necessary():
00166 if not self._started:
00167 self.fireSystemEvent('startup')
00168 self._io_loop.add_callback(start_if_necessary)
00169
00170
00171 def seconds(self):
00172 return self._io_loop.time()
00173
00174 def callLater(self, seconds, f, *args, **kw):
00175 dc = TornadoDelayedCall(self, seconds, f, *args, **kw)
00176 self._delayedCalls[dc] = True
00177 return dc
00178
00179 def getDelayedCalls(self):
00180 return [x for x in self._delayedCalls if x._active]
00181
00182 def _removeDelayedCall(self, dc):
00183 if dc in self._delayedCalls:
00184 del self._delayedCalls[dc]
00185
00186
00187 def callFromThread(self, f, *args, **kw):
00188 """See `twisted.internet.interfaces.IReactorThreads.callFromThread`"""
00189 assert callable(f), "%s is not callable" % f
00190 with NullContext():
00191
00192
00193
00194
00195 self._io_loop.add_callback(f, *args, **kw)
00196
00197
00198
00199 def installWaker(self):
00200 pass
00201
00202 def wakeUp(self):
00203 pass
00204
00205
00206 def _invoke_callback(self, fd, events):
00207 if fd not in self._fds:
00208 return
00209 (reader, writer) = self._fds[fd]
00210 if reader:
00211 err = None
00212 if reader.fileno() == -1:
00213 err = error.ConnectionLost()
00214 elif events & IOLoop.READ:
00215 err = log.callWithLogger(reader, reader.doRead)
00216 if err is None and events & IOLoop.ERROR:
00217 err = error.ConnectionLost()
00218 if err is not None:
00219 self.removeReader(reader)
00220 reader.readConnectionLost(failure.Failure(err))
00221 if writer:
00222 err = None
00223 if writer.fileno() == -1:
00224 err = error.ConnectionLost()
00225 elif events & IOLoop.WRITE:
00226 err = log.callWithLogger(writer, writer.doWrite)
00227 if err is None and events & IOLoop.ERROR:
00228 err = error.ConnectionLost()
00229 if err is not None:
00230 self.removeWriter(writer)
00231 writer.writeConnectionLost(failure.Failure(err))
00232
00233 def addReader(self, reader):
00234 """Add a FileDescriptor for notification of data available to read."""
00235 if reader in self._readers:
00236
00237 return
00238 fd = reader.fileno()
00239 self._readers[reader] = fd
00240 if fd in self._fds:
00241 (_, writer) = self._fds[fd]
00242 self._fds[fd] = (reader, writer)
00243 if writer:
00244
00245
00246 self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
00247 else:
00248 with NullContext():
00249 self._fds[fd] = (reader, None)
00250 self._io_loop.add_handler(fd, self._invoke_callback,
00251 IOLoop.READ)
00252
00253 def addWriter(self, writer):
00254 """Add a FileDescriptor for notification of data available to write."""
00255 if writer in self._writers:
00256 return
00257 fd = writer.fileno()
00258 self._writers[writer] = fd
00259 if fd in self._fds:
00260 (reader, _) = self._fds[fd]
00261 self._fds[fd] = (reader, writer)
00262 if reader:
00263
00264
00265 self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
00266 else:
00267 with NullContext():
00268 self._fds[fd] = (None, writer)
00269 self._io_loop.add_handler(fd, self._invoke_callback,
00270 IOLoop.WRITE)
00271
00272 def removeReader(self, reader):
00273 """Remove a Selectable for notification of data available to read."""
00274 if reader in self._readers:
00275 fd = self._readers.pop(reader)
00276 (_, writer) = self._fds[fd]
00277 if writer:
00278
00279
00280 self._fds[fd] = (None, writer)
00281 self._io_loop.update_handler(fd, IOLoop.WRITE)
00282 else:
00283
00284
00285
00286 del self._fds[fd]
00287 self._io_loop.remove_handler(fd)
00288
00289 def removeWriter(self, writer):
00290 """Remove a Selectable for notification of data available to write."""
00291 if writer in self._writers:
00292 fd = self._writers.pop(writer)
00293 (reader, _) = self._fds[fd]
00294 if reader:
00295
00296
00297 self._fds[fd] = (reader, None)
00298 self._io_loop.update_handler(fd, IOLoop.READ)
00299 else:
00300
00301
00302
00303 del self._fds[fd]
00304 self._io_loop.remove_handler(fd)
00305
00306 def removeAll(self):
00307 return self._removeAll(self._readers, self._writers)
00308
00309 def getReaders(self):
00310 return self._readers.keys()
00311
00312 def getWriters(self):
00313 return self._writers.keys()
00314
00315
00316
00317
00318 def stop(self):
00319 PosixReactorBase.stop(self)
00320 fire_shutdown = functools.partial(self.fireSystemEvent, "shutdown")
00321 self._io_loop.add_callback(fire_shutdown)
00322
00323 def crash(self):
00324 PosixReactorBase.crash(self)
00325 self._io_loop.stop()
00326
00327 def doIteration(self, delay):
00328 raise NotImplementedError("doIteration")
00329
00330 def mainLoop(self):
00331 self._io_loop.start()
00332
00333
00334 class _TestReactor(TornadoReactor):
00335 """Subclass of TornadoReactor for use in unittests.
00336
00337 This can't go in the test.py file because of import-order dependencies
00338 with the Twisted reactor test builder.
00339 """
00340 def __init__(self):
00341
00342 super(_TestReactor, self).__init__(IOLoop())
00343
00344 def listenTCP(self, port, factory, backlog=50, interface=''):
00345
00346 if not interface:
00347 interface = '127.0.0.1'
00348 return super(_TestReactor, self).listenTCP(
00349 port, factory, backlog=backlog, interface=interface)
00350
00351 def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
00352 if not interface:
00353 interface = '127.0.0.1'
00354 return super(_TestReactor, self).listenUDP(
00355 port, protocol, interface=interface, maxPacketSize=maxPacketSize)
00356
00357
00358 def install(io_loop=None):
00359 """Install this package as the default Twisted reactor."""
00360 if not io_loop:
00361 io_loop = tornado.ioloop.IOLoop.current()
00362 reactor = TornadoReactor(io_loop)
00363 from twisted.internet.main import installReactor
00364 installReactor(reactor)
00365 return reactor
00366
00367
00368 @implementer(IReadDescriptor, IWriteDescriptor)
00369 class _FD(object):
00370 def __init__(self, fd, fileobj, handler):
00371 self.fd = fd
00372 self.fileobj = fileobj
00373 self.handler = handler
00374 self.reading = False
00375 self.writing = False
00376 self.lost = False
00377
00378 def fileno(self):
00379 return self.fd
00380
00381 def doRead(self):
00382 if not self.lost:
00383 self.handler(self.fileobj, tornado.ioloop.IOLoop.READ)
00384
00385 def doWrite(self):
00386 if not self.lost:
00387 self.handler(self.fileobj, tornado.ioloop.IOLoop.WRITE)
00388
00389 def connectionLost(self, reason):
00390 if not self.lost:
00391 self.handler(self.fileobj, tornado.ioloop.IOLoop.ERROR)
00392 self.lost = True
00393
00394 def logPrefix(self):
00395 return ''
00396
00397
00398 class TwistedIOLoop(tornado.ioloop.IOLoop):
00399 """IOLoop implementation that runs on Twisted.
00400
00401 Uses the global Twisted reactor by default. To create multiple
00402 `TwistedIOLoops` in the same process, you must pass a unique reactor
00403 when constructing each one.
00404
00405 Not compatible with `tornado.process.Subprocess.set_exit_callback`
00406 because the ``SIGCHLD`` handlers used by Tornado and Twisted conflict
00407 with each other.
00408 """
00409 def initialize(self, reactor=None):
00410 if reactor is None:
00411 import twisted.internet.reactor
00412 reactor = twisted.internet.reactor
00413 self.reactor = reactor
00414 self.fds = {}
00415 self.reactor.callWhenRunning(self.make_current)
00416
00417 def close(self, all_fds=False):
00418 fds = self.fds
00419 self.reactor.removeAll()
00420 for c in self.reactor.getDelayedCalls():
00421 c.cancel()
00422 if all_fds:
00423 for fd in fds.values():
00424 self.close_fd(fd.fileobj)
00425
00426 def add_handler(self, fd, handler, events):
00427 if fd in self.fds:
00428 raise ValueError('fd %s added twice' % fd)
00429 fd, fileobj = self.split_fd(fd)
00430 self.fds[fd] = _FD(fd, fileobj, wrap(handler))
00431 if events & tornado.ioloop.IOLoop.READ:
00432 self.fds[fd].reading = True
00433 self.reactor.addReader(self.fds[fd])
00434 if events & tornado.ioloop.IOLoop.WRITE:
00435 self.fds[fd].writing = True
00436 self.reactor.addWriter(self.fds[fd])
00437
00438 def update_handler(self, fd, events):
00439 fd, fileobj = self.split_fd(fd)
00440 if events & tornado.ioloop.IOLoop.READ:
00441 if not self.fds[fd].reading:
00442 self.fds[fd].reading = True
00443 self.reactor.addReader(self.fds[fd])
00444 else:
00445 if self.fds[fd].reading:
00446 self.fds[fd].reading = False
00447 self.reactor.removeReader(self.fds[fd])
00448 if events & tornado.ioloop.IOLoop.WRITE:
00449 if not self.fds[fd].writing:
00450 self.fds[fd].writing = True
00451 self.reactor.addWriter(self.fds[fd])
00452 else:
00453 if self.fds[fd].writing:
00454 self.fds[fd].writing = False
00455 self.reactor.removeWriter(self.fds[fd])
00456
00457 def remove_handler(self, fd):
00458 fd, fileobj = self.split_fd(fd)
00459 if fd not in self.fds:
00460 return
00461 self.fds[fd].lost = True
00462 if self.fds[fd].reading:
00463 self.reactor.removeReader(self.fds[fd])
00464 if self.fds[fd].writing:
00465 self.reactor.removeWriter(self.fds[fd])
00466 del self.fds[fd]
00467
00468 def start(self):
00469 self._setup_logging()
00470 self.reactor.run()
00471
00472 def stop(self):
00473 self.reactor.crash()
00474
00475 def add_timeout(self, deadline, callback, *args, **kwargs):
00476
00477
00478
00479 if isinstance(deadline, numbers.Real):
00480 delay = max(deadline - self.time(), 0)
00481 elif isinstance(deadline, datetime.timedelta):
00482 delay = timedelta_to_seconds(deadline)
00483 else:
00484 raise TypeError("Unsupported deadline %r")
00485 return self.reactor.callLater(
00486 delay, self._run_callback,
00487 functools.partial(wrap(callback), *args, **kwargs))
00488
00489 def remove_timeout(self, timeout):
00490 if timeout.active():
00491 timeout.cancel()
00492
00493 def add_callback(self, callback, *args, **kwargs):
00494 self.reactor.callFromThread(
00495 self._run_callback,
00496 functools.partial(wrap(callback), *args, **kwargs))
00497
00498 def add_callback_from_signal(self, callback, *args, **kwargs):
00499 self.add_callback(callback, *args, **kwargs)
00500
00501
00502 class TwistedResolver(Resolver):
00503 """Twisted-based asynchronous resolver.
00504
00505 This is a non-blocking and non-threaded resolver. It is
00506 recommended only when threads cannot be used, since it has
00507 limitations compared to the standard ``getaddrinfo``-based
00508 `~tornado.netutil.Resolver` and
00509 `~tornado.netutil.ThreadedResolver`. Specifically, it returns at
00510 most one result, and arguments other than ``host`` and ``family``
00511 are ignored. It may fail to resolve when ``family`` is not
00512 ``socket.AF_UNSPEC``.
00513
00514 Requires Twisted 12.1 or newer.
00515 """
00516 def initialize(self, io_loop=None):
00517 self.io_loop = io_loop or IOLoop.current()
00518
00519
00520 self.reactor = tornado.platform.twisted.TornadoReactor(io_loop)
00521
00522 host_resolver = twisted.names.hosts.Resolver('/etc/hosts')
00523 cache_resolver = twisted.names.cache.CacheResolver(reactor=self.reactor)
00524 real_resolver = twisted.names.client.Resolver('/etc/resolv.conf',
00525 reactor=self.reactor)
00526 self.resolver = twisted.names.resolve.ResolverChain(
00527 [host_resolver, cache_resolver, real_resolver])
00528
00529 @gen.coroutine
00530 def resolve(self, host, port, family=0):
00531
00532
00533 if twisted.internet.abstract.isIPAddress(host):
00534 resolved = host
00535 resolved_family = socket.AF_INET
00536 elif twisted.internet.abstract.isIPv6Address(host):
00537 resolved = host
00538 resolved_family = socket.AF_INET6
00539 else:
00540 deferred = self.resolver.getHostByName(utf8(host))
00541 resolved = yield gen.Task(deferred.addBoth)
00542 if isinstance(resolved, failure.Failure):
00543 resolved.raiseException()
00544 elif twisted.internet.abstract.isIPAddress(resolved):
00545 resolved_family = socket.AF_INET
00546 elif twisted.internet.abstract.isIPv6Address(resolved):
00547 resolved_family = socket.AF_INET6
00548 else:
00549 resolved_family = socket.AF_UNSPEC
00550 if family != socket.AF_UNSPEC and family != resolved_family:
00551 raise Exception('Requested socket family %d but got %d' %
00552 (family, resolved_family))
00553 result = [
00554 (resolved_family, (resolved, port)),
00555 ]
00556 raise gen.Return(result)