00001 """``tornado.gen`` is a generator-based interface to make it easier to
00002 work in an asynchronous environment. Code using the ``gen`` module
00003 is technically asynchronous, but it is written as a single generator
00004 instead of a collection of separate functions.
00005
00006 For example, the following asynchronous handler::
00007
00008 class AsyncHandler(RequestHandler):
00009 @asynchronous
00010 def get(self):
00011 http_client = AsyncHTTPClient()
00012 http_client.fetch("http://example.com",
00013 callback=self.on_fetch)
00014
00015 def on_fetch(self, response):
00016 do_something_with_response(response)
00017 self.render("template.html")
00018
00019 could be written with ``gen`` as::
00020
00021 class GenAsyncHandler(RequestHandler):
00022 @asynchronous
00023 @gen.engine
00024 def get(self):
00025 http_client = AsyncHTTPClient()
00026 response = yield gen.Task(http_client.fetch, "http://example.com")
00027 do_something_with_response(response)
00028 self.render("template.html")
00029
00030 `Task` works with any function that takes a ``callback`` keyword
00031 argument. You can also yield a list of ``Tasks``, which will be
00032 started at the same time and run in parallel; a list of results will
00033 be returned when they are all finished::
00034
00035 def get(self):
00036 http_client = AsyncHTTPClient()
00037 response1, response2 = yield [gen.Task(http_client.fetch, url1),
00038 gen.Task(http_client.fetch, url2)]
00039
00040 For more complicated interfaces, `Task` can be split into two parts:
00041 `Callback` and `Wait`::
00042
00043 class GenAsyncHandler2(RequestHandler):
00044 @asynchronous
00045 @gen.engine
00046 def get(self):
00047 http_client = AsyncHTTPClient()
00048 http_client.fetch("http://example.com",
00049 callback=(yield gen.Callback("key"))
00050 response = yield gen.Wait("key")
00051 do_something_with_response(response)
00052 self.render("template.html")
00053
00054 The ``key`` argument to `Callback` and `Wait` allows for multiple
00055 asynchronous operations to be started at different times and proceed
00056 in parallel: yield several callbacks with different keys, then wait
00057 for them once all the async operations have started.
00058
00059 The result of a `Wait` or `Task` yield expression depends on how the callback
00060 was run. If it was called with no arguments, the result is ``None``. If
00061 it was called with one argument, the result is that argument. If it was
00062 called with more than one argument or any keyword arguments, the result
00063 is an `Arguments` object, which is a named tuple ``(args, kwargs)``.
00064 """
00065 from __future__ import absolute_import, division, with_statement
00066
00067 import functools
00068 import operator
00069 import sys
00070 import types
00071
00072 from tornado.stack_context import ExceptionStackContext
00073
00074
00075 class KeyReuseError(Exception):
00076 pass
00077
00078
00079 class UnknownKeyError(Exception):
00080 pass
00081
00082
00083 class LeakedCallbackError(Exception):
00084 pass
00085
00086
00087 class BadYieldError(Exception):
00088 pass
00089
00090
00091 def engine(func):
00092 """Decorator for asynchronous generators.
00093
00094 Any generator that yields objects from this module must be wrapped
00095 in this decorator. The decorator only works on functions that are
00096 already asynchronous. For `~tornado.web.RequestHandler`
00097 ``get``/``post``/etc methods, this means that both the
00098 `tornado.web.asynchronous` and `tornado.gen.engine` decorators
00099 must be used (for proper exception handling, ``asynchronous``
00100 should come before ``gen.engine``). In most other cases, it means
00101 that it doesn't make sense to use ``gen.engine`` on functions that
00102 don't already take a callback argument.
00103 """
00104 @functools.wraps(func)
00105 def wrapper(*args, **kwargs):
00106 runner = None
00107
00108 def handle_exception(typ, value, tb):
00109
00110
00111
00112
00113 if runner is not None:
00114 return runner.handle_exception(typ, value, tb)
00115 return False
00116 with ExceptionStackContext(handle_exception) as deactivate:
00117 gen = func(*args, **kwargs)
00118 if isinstance(gen, types.GeneratorType):
00119 runner = Runner(gen, deactivate)
00120 runner.run()
00121 return
00122 assert gen is None, gen
00123 deactivate()
00124
00125 return wrapper
00126
00127
00128 class YieldPoint(object):
00129 """Base class for objects that may be yielded from the generator."""
00130 def start(self, runner):
00131 """Called by the runner after the generator has yielded.
00132
00133 No other methods will be called on this object before ``start``.
00134 """
00135 raise NotImplementedError()
00136
00137 def is_ready(self):
00138 """Called by the runner to determine whether to resume the generator.
00139
00140 Returns a boolean; may be called more than once.
00141 """
00142 raise NotImplementedError()
00143
00144 def get_result(self):
00145 """Returns the value to use as the result of the yield expression.
00146
00147 This method will only be called once, and only after `is_ready`
00148 has returned true.
00149 """
00150 raise NotImplementedError()
00151
00152
00153 class Callback(YieldPoint):
00154 """Returns a callable object that will allow a matching `Wait` to proceed.
00155
00156 The key may be any value suitable for use as a dictionary key, and is
00157 used to match ``Callbacks`` to their corresponding ``Waits``. The key
00158 must be unique among outstanding callbacks within a single run of the
00159 generator function, but may be reused across different runs of the same
00160 function (so constants generally work fine).
00161
00162 The callback may be called with zero or one arguments; if an argument
00163 is given it will be returned by `Wait`.
00164 """
00165 def __init__(self, key):
00166 self.key = key
00167
00168 def start(self, runner):
00169 self.runner = runner
00170 runner.register_callback(self.key)
00171
00172 def is_ready(self):
00173 return True
00174
00175 def get_result(self):
00176 return self.runner.result_callback(self.key)
00177
00178
00179 class Wait(YieldPoint):
00180 """Returns the argument passed to the result of a previous `Callback`."""
00181 def __init__(self, key):
00182 self.key = key
00183
00184 def start(self, runner):
00185 self.runner = runner
00186
00187 def is_ready(self):
00188 return self.runner.is_ready(self.key)
00189
00190 def get_result(self):
00191 return self.runner.pop_result(self.key)
00192
00193
00194 class WaitAll(YieldPoint):
00195 """Returns the results of multiple previous `Callbacks`.
00196
00197 The argument is a sequence of `Callback` keys, and the result is
00198 a list of results in the same order.
00199
00200 `WaitAll` is equivalent to yielding a list of `Wait` objects.
00201 """
00202 def __init__(self, keys):
00203 self.keys = keys
00204
00205 def start(self, runner):
00206 self.runner = runner
00207
00208 def is_ready(self):
00209 return all(self.runner.is_ready(key) for key in self.keys)
00210
00211 def get_result(self):
00212 return [self.runner.pop_result(key) for key in self.keys]
00213
00214
00215 class Task(YieldPoint):
00216 """Runs a single asynchronous operation.
00217
00218 Takes a function (and optional additional arguments) and runs it with
00219 those arguments plus a ``callback`` keyword argument. The argument passed
00220 to the callback is returned as the result of the yield expression.
00221
00222 A `Task` is equivalent to a `Callback`/`Wait` pair (with a unique
00223 key generated automatically)::
00224
00225 result = yield gen.Task(func, args)
00226
00227 func(args, callback=(yield gen.Callback(key)))
00228 result = yield gen.Wait(key)
00229 """
00230 def __init__(self, func, *args, **kwargs):
00231 assert "callback" not in kwargs
00232 self.args = args
00233 self.kwargs = kwargs
00234 self.func = func
00235
00236 def start(self, runner):
00237 self.runner = runner
00238 self.key = object()
00239 runner.register_callback(self.key)
00240 self.kwargs["callback"] = runner.result_callback(self.key)
00241 self.func(*self.args, **self.kwargs)
00242
00243 def is_ready(self):
00244 return self.runner.is_ready(self.key)
00245
00246 def get_result(self):
00247 return self.runner.pop_result(self.key)
00248
00249
00250 class Multi(YieldPoint):
00251 """Runs multiple asynchronous operations in parallel.
00252
00253 Takes a list of ``Tasks`` or other ``YieldPoints`` and returns a list of
00254 their responses. It is not necessary to call `Multi` explicitly,
00255 since the engine will do so automatically when the generator yields
00256 a list of ``YieldPoints``.
00257 """
00258 def __init__(self, children):
00259 assert all(isinstance(i, YieldPoint) for i in children)
00260 self.children = children
00261
00262 def start(self, runner):
00263 for i in self.children:
00264 i.start(runner)
00265
00266 def is_ready(self):
00267 return all(i.is_ready() for i in self.children)
00268
00269 def get_result(self):
00270 return [i.get_result() for i in self.children]
00271
00272
00273 class _NullYieldPoint(YieldPoint):
00274 def start(self, runner):
00275 pass
00276
00277 def is_ready(self):
00278 return True
00279
00280 def get_result(self):
00281 return None
00282
00283
00284 class Runner(object):
00285 """Internal implementation of `tornado.gen.engine`.
00286
00287 Maintains information about pending callbacks and their results.
00288 """
00289 def __init__(self, gen, deactivate_stack_context):
00290 self.gen = gen
00291 self.deactivate_stack_context = deactivate_stack_context
00292 self.yield_point = _NullYieldPoint()
00293 self.pending_callbacks = set()
00294 self.results = {}
00295 self.running = False
00296 self.finished = False
00297 self.exc_info = None
00298 self.had_exception = False
00299
00300 def register_callback(self, key):
00301 """Adds ``key`` to the list of callbacks."""
00302 if key in self.pending_callbacks:
00303 raise KeyReuseError("key %r is already pending" % key)
00304 self.pending_callbacks.add(key)
00305
00306 def is_ready(self, key):
00307 """Returns true if a result is available for ``key``."""
00308 if key not in self.pending_callbacks:
00309 raise UnknownKeyError("key %r is not pending" % key)
00310 return key in self.results
00311
00312 def set_result(self, key, result):
00313 """Sets the result for ``key`` and attempts to resume the generator."""
00314 self.results[key] = result
00315 self.run()
00316
00317 def pop_result(self, key):
00318 """Returns the result for ``key`` and unregisters it."""
00319 self.pending_callbacks.remove(key)
00320 return self.results.pop(key)
00321
00322 def run(self):
00323 """Starts or resumes the generator, running until it reaches a
00324 yield point that is not ready.
00325 """
00326 if self.running or self.finished:
00327 return
00328 try:
00329 self.running = True
00330 while True:
00331 if self.exc_info is None:
00332 try:
00333 if not self.yield_point.is_ready():
00334 return
00335 next = self.yield_point.get_result()
00336 except Exception:
00337 self.exc_info = sys.exc_info()
00338 try:
00339 if self.exc_info is not None:
00340 self.had_exception = True
00341 exc_info = self.exc_info
00342 self.exc_info = None
00343 yielded = self.gen.throw(*exc_info)
00344 else:
00345 yielded = self.gen.send(next)
00346 except StopIteration:
00347 self.finished = True
00348 if self.pending_callbacks and not self.had_exception:
00349
00350
00351
00352
00353 raise LeakedCallbackError(
00354 "finished without waiting for callbacks %r" %
00355 self.pending_callbacks)
00356 self.deactivate_stack_context()
00357 return
00358 except Exception:
00359 self.finished = True
00360 raise
00361 if isinstance(yielded, list):
00362 yielded = Multi(yielded)
00363 if isinstance(yielded, YieldPoint):
00364 self.yield_point = yielded
00365 try:
00366 self.yield_point.start(self)
00367 except Exception:
00368 self.exc_info = sys.exc_info()
00369 else:
00370 self.exc_info = (BadYieldError("yielded unknown object %r" % yielded),)
00371 finally:
00372 self.running = False
00373
00374 def result_callback(self, key):
00375 def inner(*args, **kwargs):
00376 if kwargs or len(args) > 1:
00377 result = Arguments(args, kwargs)
00378 elif args:
00379 result = args[0]
00380 else:
00381 result = None
00382 self.set_result(key, result)
00383 return inner
00384
00385 def handle_exception(self, typ, value, tb):
00386 if not self.running and not self.finished:
00387 self.exc_info = (typ, value, tb)
00388 self.run()
00389 return True
00390 else:
00391 return False
00392
00393
00394
00395
00396 class Arguments(tuple):
00397 """The result of a yield expression whose callback had more than one
00398 argument (or keyword arguments).
00399
00400 The `Arguments` object can be used as a tuple ``(args, kwargs)``
00401 or an object with attributes ``args`` and ``kwargs``.
00402 """
00403 __slots__ = ()
00404
00405 def __new__(cls, args, kwargs):
00406 return tuple.__new__(cls, (args, kwargs))
00407
00408 args = property(operator.itemgetter(0))
00409 kwargs = property(operator.itemgetter(1))