00001 """``tornado.gen`` is a generator-based interface to make it easier to
00002 work in an asynchronous environment. Code using the ``gen`` module
00003 is technically asynchronous, but it is written as a single generator
00004 instead of a collection of separate functions.
00005
00006 For example, the following asynchronous handler::
00007
00008 class AsyncHandler(RequestHandler):
00009 @asynchronous
00010 def get(self):
00011 http_client = AsyncHTTPClient()
00012 http_client.fetch("http://example.com",
00013 callback=self.on_fetch)
00014
00015 def on_fetch(self, response):
00016 do_something_with_response(response)
00017 self.render("template.html")
00018
00019 could be written with ``gen`` as::
00020
00021 class GenAsyncHandler(RequestHandler):
00022 @gen.coroutine
00023 def get(self):
00024 http_client = AsyncHTTPClient()
00025 response = yield http_client.fetch("http://example.com")
00026 do_something_with_response(response)
00027 self.render("template.html")
00028
00029 Most asynchronous functions in Tornado return a `.Future`;
00030 yielding this object returns its `~.Future.result`.
00031
00032 You can also yield a list or dict of ``Futures``, which will be
00033 started at the same time and run in parallel; a list or dict of results will
00034 be returned when they are all finished::
00035
00036 @gen.coroutine
00037 def get(self):
00038 http_client = AsyncHTTPClient()
00039 response1, response2 = yield [http_client.fetch(url1),
00040 http_client.fetch(url2)]
00041 response_dict = yield dict(response3=http_client.fetch(url3),
00042 response4=http_client.fetch(url4))
00043 response3 = response_dict['response3']
00044 response4 = response_dict['response4']
00045
00046 .. versionchanged:: 3.2
00047 Dict support added.
00048 """
00049 from __future__ import absolute_import, division, print_function, with_statement
00050
00051 import collections
00052 import functools
00053 import itertools
00054 import sys
00055 import types
00056
00057 from tornado.concurrent import Future, TracebackFuture, is_future, chain_future
00058 from tornado.ioloop import IOLoop
00059 from tornado import stack_context
00060
00061
00062 class KeyReuseError(Exception):
00063 pass
00064
00065
00066 class UnknownKeyError(Exception):
00067 pass
00068
00069
00070 class LeakedCallbackError(Exception):
00071 pass
00072
00073
00074 class BadYieldError(Exception):
00075 pass
00076
00077
00078 class ReturnValueIgnoredError(Exception):
00079 pass
00080
00081
00082 class TimeoutError(Exception):
00083 """Exception raised by ``with_timeout``."""
00084
00085
00086 def engine(func):
00087 """Callback-oriented decorator for asynchronous generators.
00088
00089 This is an older interface; for new code that does not need to be
00090 compatible with versions of Tornado older than 3.0 the
00091 `coroutine` decorator is recommended instead.
00092
00093 This decorator is similar to `coroutine`, except it does not
00094 return a `.Future` and the ``callback`` argument is not treated
00095 specially.
00096
00097 In most cases, functions decorated with `engine` should take
00098 a ``callback`` argument and invoke it with their result when
00099 they are finished. One notable exception is the
00100 `~tornado.web.RequestHandler` :ref:`HTTP verb methods <verbs>`,
00101 which use ``self.finish()`` in place of a callback argument.
00102 """
00103 func = _make_coroutine_wrapper(func, replace_callback=False)
00104 @functools.wraps(func)
00105 def wrapper(*args, **kwargs):
00106 future = func(*args, **kwargs)
00107 def final_callback(future):
00108 if future.result() is not None:
00109 raise ReturnValueIgnoredError(
00110 "@gen.engine functions cannot return values: %r" %
00111 (future.result(),))
00112 future.add_done_callback(final_callback)
00113 return wrapper
00114
00115
00116 def coroutine(func, replace_callback=True):
00117 """Decorator for asynchronous generators.
00118
00119 Any generator that yields objects from this module must be wrapped
00120 in either this decorator or `engine`.
00121
00122 Coroutines may "return" by raising the special exception
00123 `Return(value) <Return>`. In Python 3.3+, it is also possible for
00124 the function to simply use the ``return value`` statement (prior to
00125 Python 3.3 generators were not allowed to also return values).
00126 In all versions of Python a coroutine that simply wishes to exit
00127 early may use the ``return`` statement without a value.
00128
00129 Functions with this decorator return a `.Future`. Additionally,
00130 they may be called with a ``callback`` keyword argument, which
00131 will be invoked with the future's result when it resolves. If the
00132 coroutine fails, the callback will not be run and an exception
00133 will be raised into the surrounding `.StackContext`. The
00134 ``callback`` argument is not visible inside the decorated
00135 function; it is handled by the decorator itself.
00136
00137 From the caller's perspective, ``@gen.coroutine`` is similar to
00138 the combination of ``@return_future`` and ``@gen.engine``.
00139 """
00140 return _make_coroutine_wrapper(func, replace_callback=True)
00141
00142
00143 def _make_coroutine_wrapper(func, replace_callback):
00144 """The inner workings of ``@gen.coroutine`` and ``@gen.engine``.
00145
00146 The two decorators differ in their treatment of the ``callback``
00147 argument, so we cannot simply implement ``@engine`` in terms of
00148 ``@coroutine``.
00149 """
00150 @functools.wraps(func)
00151 def wrapper(*args, **kwargs):
00152 future = TracebackFuture()
00153
00154 if replace_callback and 'callback' in kwargs:
00155 callback = kwargs.pop('callback')
00156 IOLoop.current().add_future(
00157 future, lambda future: callback(future.result()))
00158
00159 try:
00160 result = func(*args, **kwargs)
00161 except (Return, StopIteration) as e:
00162 result = getattr(e, 'value', None)
00163 except Exception:
00164 future.set_exc_info(sys.exc_info())
00165 return future
00166 else:
00167 if isinstance(result, types.GeneratorType):
00168
00169
00170
00171
00172
00173 try:
00174 orig_stack_contexts = stack_context._state.contexts
00175 yielded = next(result)
00176 if stack_context._state.contexts is not orig_stack_contexts:
00177 yielded = TracebackFuture()
00178 yielded.set_exception(
00179 stack_context.StackContextInconsistentError(
00180 'stack_context inconsistency (probably caused '
00181 'by yield within a "with StackContext" block)'))
00182 except (StopIteration, Return) as e:
00183 future.set_result(getattr(e, 'value', None))
00184 except Exception:
00185 future.set_exc_info(sys.exc_info())
00186 else:
00187 Runner(result, future, yielded)
00188 try:
00189 return future
00190 finally:
00191
00192
00193
00194
00195
00196
00197
00198
00199 future = None
00200 future.set_result(result)
00201 return future
00202 return wrapper
00203
00204
00205 class Return(Exception):
00206 """Special exception to return a value from a `coroutine`.
00207
00208 If this exception is raised, its value argument is used as the
00209 result of the coroutine::
00210
00211 @gen.coroutine
00212 def fetch_json(url):
00213 response = yield AsyncHTTPClient().fetch(url)
00214 raise gen.Return(json_decode(response.body))
00215
00216 In Python 3.3, this exception is no longer necessary: the ``return``
00217 statement can be used directly to return a value (previously
00218 ``yield`` and ``return`` with a value could not be combined in the
00219 same function).
00220
00221 By analogy with the return statement, the value argument is optional,
00222 but it is never necessary to ``raise gen.Return()``. The ``return``
00223 statement can be used with no arguments instead.
00224 """
00225 def __init__(self, value=None):
00226 super(Return, self).__init__()
00227 self.value = value
00228
00229
00230 class YieldPoint(object):
00231 """Base class for objects that may be yielded from the generator.
00232
00233 .. deprecated:: 4.0
00234 Use `Futures <.Future>` instead.
00235 """
00236 def start(self, runner):
00237 """Called by the runner after the generator has yielded.
00238
00239 No other methods will be called on this object before ``start``.
00240 """
00241 raise NotImplementedError()
00242
00243 def is_ready(self):
00244 """Called by the runner to determine whether to resume the generator.
00245
00246 Returns a boolean; may be called more than once.
00247 """
00248 raise NotImplementedError()
00249
00250 def get_result(self):
00251 """Returns the value to use as the result of the yield expression.
00252
00253 This method will only be called once, and only after `is_ready`
00254 has returned true.
00255 """
00256 raise NotImplementedError()
00257
00258
00259 class Callback(YieldPoint):
00260 """Returns a callable object that will allow a matching `Wait` to proceed.
00261
00262 The key may be any value suitable for use as a dictionary key, and is
00263 used to match ``Callbacks`` to their corresponding ``Waits``. The key
00264 must be unique among outstanding callbacks within a single run of the
00265 generator function, but may be reused across different runs of the same
00266 function (so constants generally work fine).
00267
00268 The callback may be called with zero or one arguments; if an argument
00269 is given it will be returned by `Wait`.
00270
00271 .. deprecated:: 4.0
00272 Use `Futures <.Future>` instead.
00273 """
00274 def __init__(self, key):
00275 self.key = key
00276
00277 def start(self, runner):
00278 self.runner = runner
00279 runner.register_callback(self.key)
00280
00281 def is_ready(self):
00282 return True
00283
00284 def get_result(self):
00285 return self.runner.result_callback(self.key)
00286
00287
00288 class Wait(YieldPoint):
00289 """Returns the argument passed to the result of a previous `Callback`.
00290
00291 .. deprecated:: 4.0
00292 Use `Futures <.Future>` instead.
00293 """
00294 def __init__(self, key):
00295 self.key = key
00296
00297 def start(self, runner):
00298 self.runner = runner
00299
00300 def is_ready(self):
00301 return self.runner.is_ready(self.key)
00302
00303 def get_result(self):
00304 return self.runner.pop_result(self.key)
00305
00306
00307 class WaitAll(YieldPoint):
00308 """Returns the results of multiple previous `Callbacks <Callback>`.
00309
00310 The argument is a sequence of `Callback` keys, and the result is
00311 a list of results in the same order.
00312
00313 `WaitAll` is equivalent to yielding a list of `Wait` objects.
00314
00315 .. deprecated:: 4.0
00316 Use `Futures <.Future>` instead.
00317 """
00318 def __init__(self, keys):
00319 self.keys = keys
00320
00321 def start(self, runner):
00322 self.runner = runner
00323
00324 def is_ready(self):
00325 return all(self.runner.is_ready(key) for key in self.keys)
00326
00327 def get_result(self):
00328 return [self.runner.pop_result(key) for key in self.keys]
00329
00330
00331 def Task(func, *args, **kwargs):
00332 """Adapts a callback-based asynchronous function for use in coroutines.
00333
00334 Takes a function (and optional additional arguments) and runs it with
00335 those arguments plus a ``callback`` keyword argument. The argument passed
00336 to the callback is returned as the result of the yield expression.
00337
00338 .. versionchanged:: 4.0
00339 ``gen.Task`` is now a function that returns a `.Future`, instead of
00340 a subclass of `YieldPoint`. It still behaves the same way when
00341 yielded.
00342 """
00343 future = Future()
00344 def handle_exception(typ, value, tb):
00345 if future.done():
00346 return False
00347 future.set_exc_info((typ, value, tb))
00348 return True
00349 def set_result(result):
00350 if future.done():
00351 return
00352 future.set_result(result)
00353 with stack_context.ExceptionStackContext(handle_exception):
00354 func(*args, callback=_argument_adapter(set_result), **kwargs)
00355 return future
00356
00357
00358 class YieldFuture(YieldPoint):
00359 def __init__(self, future, io_loop=None):
00360 self.future = future
00361 self.io_loop = io_loop or IOLoop.current()
00362
00363 def start(self, runner):
00364 if not self.future.done():
00365 self.runner = runner
00366 self.key = object()
00367 runner.register_callback(self.key)
00368 self.io_loop.add_future(self.future, runner.result_callback(self.key))
00369 else:
00370 self.runner = None
00371 self.result = self.future.result()
00372
00373 def is_ready(self):
00374 if self.runner is not None:
00375 return self.runner.is_ready(self.key)
00376 else:
00377 return True
00378
00379 def get_result(self):
00380 if self.runner is not None:
00381 return self.runner.pop_result(self.key).result()
00382 else:
00383 return self.result
00384
00385
00386 class Multi(YieldPoint):
00387 """Runs multiple asynchronous operations in parallel.
00388
00389 Takes a list of ``YieldPoints`` or ``Futures`` and returns a list of
00390 their responses. It is not necessary to call `Multi` explicitly,
00391 since the engine will do so automatically when the generator yields
00392 a list of ``YieldPoints`` or a mixture of ``YieldPoints`` and ``Futures``.
00393
00394 Instead of a list, the argument may also be a dictionary whose values are
00395 Futures, in which case a parallel dictionary is returned mapping the same
00396 keys to their results.
00397 """
00398 def __init__(self, children):
00399 self.keys = None
00400 if isinstance(children, dict):
00401 self.keys = list(children.keys())
00402 children = children.values()
00403 self.children = []
00404 for i in children:
00405 if is_future(i):
00406 i = YieldFuture(i)
00407 self.children.append(i)
00408 assert all(isinstance(i, YieldPoint) for i in self.children)
00409 self.unfinished_children = set(self.children)
00410
00411 def start(self, runner):
00412 for i in self.children:
00413 i.start(runner)
00414
00415 def is_ready(self):
00416 finished = list(itertools.takewhile(
00417 lambda i: i.is_ready(), self.unfinished_children))
00418 self.unfinished_children.difference_update(finished)
00419 return not self.unfinished_children
00420
00421 def get_result(self):
00422 result = (i.get_result() for i in self.children)
00423 if self.keys is not None:
00424 return dict(zip(self.keys, result))
00425 else:
00426 return list(result)
00427
00428
00429 def multi_future(children):
00430 """Wait for multiple asynchronous futures in parallel.
00431
00432 Takes a list of ``Futures`` (but *not* other ``YieldPoints``) and returns
00433 a new Future that resolves when all the other Futures are done.
00434 If all the ``Futures`` succeeded, the returned Future's result is a list
00435 of their results. If any failed, the returned Future raises the exception
00436 of the first one to fail.
00437
00438 Instead of a list, the argument may also be a dictionary whose values are
00439 Futures, in which case a parallel dictionary is returned mapping the same
00440 keys to their results.
00441
00442 It is not necessary to call `multi_future` explcitly, since the engine will
00443 do so automatically when the generator yields a list of `Futures`.
00444 This function is faster than the `Multi` `YieldPoint` because it does not
00445 require the creation of a stack context.
00446
00447 .. versionadded:: 4.0
00448 """
00449 if isinstance(children, dict):
00450 keys = list(children.keys())
00451 children = children.values()
00452 else:
00453 keys = None
00454 assert all(is_future(i) for i in children)
00455 unfinished_children = set(children)
00456
00457 future = Future()
00458 if not children:
00459 future.set_result({} if keys is not None else [])
00460 def callback(f):
00461 unfinished_children.remove(f)
00462 if not unfinished_children:
00463 try:
00464 result_list = [i.result() for i in children]
00465 except Exception:
00466 future.set_exc_info(sys.exc_info())
00467 else:
00468 if keys is not None:
00469 future.set_result(dict(zip(keys, result_list)))
00470 else:
00471 future.set_result(result_list)
00472 for f in children:
00473 f.add_done_callback(callback)
00474 return future
00475
00476
00477 def maybe_future(x):
00478 """Converts ``x`` into a `.Future`.
00479
00480 If ``x`` is already a `.Future`, it is simply returned; otherwise
00481 it is wrapped in a new `.Future`. This is suitable for use as
00482 ``result = yield gen.maybe_future(f())`` when you don't know whether
00483 ``f()`` returns a `.Future` or not.
00484 """
00485 if is_future(x):
00486 return x
00487 else:
00488 fut = Future()
00489 fut.set_result(x)
00490 return fut
00491
00492
00493 def with_timeout(timeout, future, io_loop=None):
00494 """Wraps a `.Future` in a timeout.
00495
00496 Raises `TimeoutError` if the input future does not complete before
00497 ``timeout``, which may be specified in any form allowed by
00498 `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or an absolute time
00499 relative to `.IOLoop.time`)
00500
00501 Currently only supports Futures, not other `YieldPoint` classes.
00502
00503 .. versionadded:: 4.0
00504 """
00505
00506
00507
00508
00509
00510
00511
00512
00513 result = Future()
00514 chain_future(future, result)
00515 if io_loop is None:
00516 io_loop = IOLoop.current()
00517 timeout_handle = io_loop.add_timeout(
00518 timeout,
00519 lambda: result.set_exception(TimeoutError("Timeout")))
00520 if isinstance(future, Future):
00521
00522
00523
00524 future.add_done_callback(
00525 lambda future: io_loop.remove_timeout(timeout_handle))
00526 else:
00527
00528
00529 io_loop.add_future(
00530 future, lambda future: io_loop.remove_timeout(timeout_handle))
00531 return result
00532
00533
00534 _null_future = Future()
00535 _null_future.set_result(None)
00536
00537 moment = Future()
00538 moment.__doc__ = \
00539 """A special object which may be yielded to allow the IOLoop to run for
00540 one iteration.
00541
00542 This is not needed in normal use but it can be helpful in long-running
00543 coroutines that are likely to yield Futures that are ready instantly.
00544
00545 Usage: ``yield gen.moment``
00546
00547 .. versionadded:: 4.0
00548 """
00549 moment.set_result(None)
00550
00551
00552 class Runner(object):
00553 """Internal implementation of `tornado.gen.engine`.
00554
00555 Maintains information about pending callbacks and their results.
00556
00557 The results of the generator are stored in ``result_future`` (a
00558 `.TracebackFuture`)
00559 """
00560 def __init__(self, gen, result_future, first_yielded):
00561 self.gen = gen
00562 self.result_future = result_future
00563 self.future = _null_future
00564 self.yield_point = None
00565 self.pending_callbacks = None
00566 self.results = None
00567 self.running = False
00568 self.finished = False
00569 self.had_exception = False
00570 self.io_loop = IOLoop.current()
00571
00572
00573
00574
00575
00576 self.stack_context_deactivate = None
00577 if self.handle_yield(first_yielded):
00578 self.run()
00579
00580 def register_callback(self, key):
00581 """Adds ``key`` to the list of callbacks."""
00582 if self.pending_callbacks is None:
00583
00584 self.pending_callbacks = set()
00585 self.results = {}
00586 if key in self.pending_callbacks:
00587 raise KeyReuseError("key %r is already pending" % (key,))
00588 self.pending_callbacks.add(key)
00589
00590 def is_ready(self, key):
00591 """Returns true if a result is available for ``key``."""
00592 if self.pending_callbacks is None or key not in self.pending_callbacks:
00593 raise UnknownKeyError("key %r is not pending" % (key,))
00594 return key in self.results
00595
00596 def set_result(self, key, result):
00597 """Sets the result for ``key`` and attempts to resume the generator."""
00598 self.results[key] = result
00599 if self.yield_point is not None and self.yield_point.is_ready():
00600 try:
00601 self.future.set_result(self.yield_point.get_result())
00602 except:
00603 self.future.set_exc_info(sys.exc_info())
00604 self.yield_point = None
00605 self.run()
00606
00607 def pop_result(self, key):
00608 """Returns the result for ``key`` and unregisters it."""
00609 self.pending_callbacks.remove(key)
00610 return self.results.pop(key)
00611
00612 def run(self):
00613 """Starts or resumes the generator, running until it reaches a
00614 yield point that is not ready.
00615 """
00616 if self.running or self.finished:
00617 return
00618 try:
00619 self.running = True
00620 while True:
00621 future = self.future
00622 if not future.done():
00623 return
00624 self.future = None
00625 try:
00626 orig_stack_contexts = stack_context._state.contexts
00627 try:
00628 value = future.result()
00629 except Exception:
00630 self.had_exception = True
00631 yielded = self.gen.throw(*sys.exc_info())
00632 else:
00633 yielded = self.gen.send(value)
00634 if stack_context._state.contexts is not orig_stack_contexts:
00635 self.gen.throw(
00636 stack_context.StackContextInconsistentError(
00637 'stack_context inconsistency (probably caused '
00638 'by yield within a "with StackContext" block)'))
00639 except (StopIteration, Return) as e:
00640 self.finished = True
00641 self.future = _null_future
00642 if self.pending_callbacks and not self.had_exception:
00643
00644
00645
00646
00647 raise LeakedCallbackError(
00648 "finished without waiting for callbacks %r" %
00649 self.pending_callbacks)
00650 self.result_future.set_result(getattr(e, 'value', None))
00651 self.result_future = None
00652 self._deactivate_stack_context()
00653 return
00654 except Exception:
00655 self.finished = True
00656 self.future = _null_future
00657 self.result_future.set_exc_info(sys.exc_info())
00658 self.result_future = None
00659 self._deactivate_stack_context()
00660 return
00661 if not self.handle_yield(yielded):
00662 return
00663 finally:
00664 self.running = False
00665
00666 def handle_yield(self, yielded):
00667 if isinstance(yielded, list):
00668 if all(is_future(f) for f in yielded):
00669 yielded = multi_future(yielded)
00670 else:
00671 yielded = Multi(yielded)
00672 elif isinstance(yielded, dict):
00673 if all(is_future(f) for f in yielded.values()):
00674 yielded = multi_future(yielded)
00675 else:
00676 yielded = Multi(yielded)
00677
00678 if isinstance(yielded, YieldPoint):
00679 self.future = TracebackFuture()
00680 def start_yield_point():
00681 try:
00682 yielded.start(self)
00683 if yielded.is_ready():
00684 self.future.set_result(
00685 yielded.get_result())
00686 else:
00687 self.yield_point = yielded
00688 except Exception:
00689 self.future = TracebackFuture()
00690 self.future.set_exc_info(sys.exc_info())
00691 if self.stack_context_deactivate is None:
00692
00693
00694 with stack_context.ExceptionStackContext(
00695 self.handle_exception) as deactivate:
00696 self.stack_context_deactivate = deactivate
00697 def cb():
00698 start_yield_point()
00699 self.run()
00700 self.io_loop.add_callback(cb)
00701 return False
00702 else:
00703 start_yield_point()
00704 elif is_future(yielded):
00705 self.future = yielded
00706 if not self.future.done() or self.future is moment:
00707 self.io_loop.add_future(
00708 self.future, lambda f: self.run())
00709 return False
00710 else:
00711 self.future = TracebackFuture()
00712 self.future.set_exception(BadYieldError(
00713 "yielded unknown object %r" % (yielded,)))
00714 return True
00715
00716 def result_callback(self, key):
00717 return stack_context.wrap(_argument_adapter(
00718 functools.partial(self.set_result, key)))
00719
00720 def handle_exception(self, typ, value, tb):
00721 if not self.running and not self.finished:
00722 self.future = TracebackFuture()
00723 self.future.set_exc_info((typ, value, tb))
00724 self.run()
00725 return True
00726 else:
00727 return False
00728
00729 def _deactivate_stack_context(self):
00730 if self.stack_context_deactivate is not None:
00731 self.stack_context_deactivate()
00732 self.stack_context_deactivate = None
00733
00734 Arguments = collections.namedtuple('Arguments', ['args', 'kwargs'])
00735
00736
00737 def _argument_adapter(callback):
00738 """Returns a function that when invoked runs ``callback`` with one arg.
00739
00740 If the function returned by this function is called with exactly
00741 one argument, that argument is passed to ``callback``. Otherwise
00742 the args tuple and kwargs dict are wrapped in an `Arguments` object.
00743 """
00744 def wrapper(*args, **kwargs):
00745 if kwargs or len(args) > 1:
00746 callback(Arguments(args, kwargs))
00747 elif args:
00748 callback(args[0])
00749 else:
00750 callback(None)
00751 return wrapper