00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 """Utilities for working with threads and ``Futures``.
00017
00018 ``Futures`` are a pattern for concurrent programming introduced in
00019 Python 3.2 in the `concurrent.futures` package (this package has also
00020 been backported to older versions of Python and can be installed with
00021 ``pip install futures``). Tornado will use `concurrent.futures.Future` if
00022 it is available; otherwise it will use a compatible class defined in this
00023 module.
00024 """
00025 from __future__ import absolute_import, division, print_function, with_statement
00026
00027 import functools
00028 import sys
00029
00030 from tornado.stack_context import ExceptionStackContext, wrap
00031 from tornado.util import raise_exc_info, ArgReplacer
00032
00033 try:
00034 from concurrent import futures
00035 except ImportError:
00036 futures = None
00037
00038
00039 class ReturnValueIgnoredError(Exception):
00040 pass
00041
00042
00043 class Future(object):
00044 """Placeholder for an asynchronous result.
00045
00046 A ``Future`` encapsulates the result of an asynchronous
00047 operation. In synchronous applications ``Futures`` are used
00048 to wait for the result from a thread or process pool; in
00049 Tornado they are normally used with `.IOLoop.add_future` or by
00050 yielding them in a `.gen.coroutine`.
00051
00052 `tornado.concurrent.Future` is similar to
00053 `concurrent.futures.Future`, but not thread-safe (and therefore
00054 faster for use with single-threaded event loops).
00055
00056 In addition to ``exception`` and ``set_exception``, methods ``exc_info``
00057 and ``set_exc_info`` are supported to capture tracebacks in Python 2.
00058 The traceback is automatically available in Python 3, but in the
00059 Python 2 futures backport this information is discarded.
00060 This functionality was previously available in a separate class
00061 ``TracebackFuture``, which is now a deprecated alias for this class.
00062
00063 .. versionchanged:: 4.0
00064 `tornado.concurrent.Future` is always a thread-unsafe ``Future``
00065 with support for the ``exc_info`` methods. Previously it would
00066 be an alias for the thread-safe `concurrent.futures.Future`
00067 if that package was available and fall back to the thread-unsafe
00068 implementation if it was not.
00069
00070 """
00071 def __init__(self):
00072 self._done = False
00073 self._result = None
00074 self._exception = None
00075 self._exc_info = None
00076 self._callbacks = []
00077
00078 def cancel(self):
00079 """Cancel the operation, if possible.
00080
00081 Tornado ``Futures`` do not support cancellation, so this method always
00082 returns False.
00083 """
00084 return False
00085
00086 def cancelled(self):
00087 """Returns True if the operation has been cancelled.
00088
00089 Tornado ``Futures`` do not support cancellation, so this method
00090 always returns False.
00091 """
00092 return False
00093
00094 def running(self):
00095 """Returns True if this operation is currently running."""
00096 return not self._done
00097
00098 def done(self):
00099 """Returns True if the future has finished running."""
00100 return self._done
00101
00102 def result(self, timeout=None):
00103 """If the operation succeeded, return its result. If it failed,
00104 re-raise its exception.
00105 """
00106 if self._result is not None:
00107 return self._result
00108 if self._exc_info is not None:
00109 raise_exc_info(self._exc_info)
00110 elif self._exception is not None:
00111 raise self._exception
00112 self._check_done()
00113 return self._result
00114
00115 def exception(self, timeout=None):
00116 """If the operation raised an exception, return the `Exception`
00117 object. Otherwise returns None.
00118 """
00119 if self._exception is not None:
00120 return self._exception
00121 else:
00122 self._check_done()
00123 return None
00124
00125 def add_done_callback(self, fn):
00126 """Attaches the given callback to the `Future`.
00127
00128 It will be invoked with the `Future` as its argument when the Future
00129 has finished running and its result is available. In Tornado
00130 consider using `.IOLoop.add_future` instead of calling
00131 `add_done_callback` directly.
00132 """
00133 if self._done:
00134 fn(self)
00135 else:
00136 self._callbacks.append(fn)
00137
00138 def set_result(self, result):
00139 """Sets the result of a ``Future``.
00140
00141 It is undefined to call any of the ``set`` methods more than once
00142 on the same object.
00143 """
00144 self._result = result
00145 self._set_done()
00146
00147 def set_exception(self, exception):
00148 """Sets the exception of a ``Future.``"""
00149 self._exception = exception
00150 self._set_done()
00151
00152 def exc_info(self):
00153 """Returns a tuple in the same format as `sys.exc_info` or None.
00154
00155 .. versionadded:: 4.0
00156 """
00157 return self._exc_info
00158
00159 def set_exc_info(self, exc_info):
00160 """Sets the exception information of a ``Future.``
00161
00162 Preserves tracebacks on Python 2.
00163
00164 .. versionadded:: 4.0
00165 """
00166 self._exc_info = exc_info
00167 self.set_exception(exc_info[1])
00168
00169 def _check_done(self):
00170 if not self._done:
00171 raise Exception("DummyFuture does not support blocking for results")
00172
00173 def _set_done(self):
00174 self._done = True
00175 for cb in self._callbacks:
00176
00177 cb(self)
00178 self._callbacks = None
00179
00180 TracebackFuture = Future
00181
00182 if futures is None:
00183 FUTURES = Future
00184 else:
00185 FUTURES = (futures.Future, Future)
00186
00187
00188 def is_future(x):
00189 return isinstance(x, FUTURES)
00190
00191
00192 class DummyExecutor(object):
00193 def submit(self, fn, *args, **kwargs):
00194 future = TracebackFuture()
00195 try:
00196 future.set_result(fn(*args, **kwargs))
00197 except Exception:
00198 future.set_exc_info(sys.exc_info())
00199 return future
00200
00201 def shutdown(self, wait=True):
00202 pass
00203
00204 dummy_executor = DummyExecutor()
00205
00206
00207 def run_on_executor(fn):
00208 """Decorator to run a synchronous method asynchronously on an executor.
00209
00210 The decorated method may be called with a ``callback`` keyword
00211 argument and returns a future.
00212
00213 This decorator should be used only on methods of objects with attributes
00214 ``executor`` and ``io_loop``.
00215 """
00216 @functools.wraps(fn)
00217 def wrapper(self, *args, **kwargs):
00218 callback = kwargs.pop("callback", None)
00219 future = self.executor.submit(fn, self, *args, **kwargs)
00220 if callback:
00221 self.io_loop.add_future(future,
00222 lambda future: callback(future.result()))
00223 return future
00224 return wrapper
00225
00226
00227 _NO_RESULT = object()
00228
00229
00230 def return_future(f):
00231 """Decorator to make a function that returns via callback return a
00232 `Future`.
00233
00234 The wrapped function should take a ``callback`` keyword argument
00235 and invoke it with one argument when it has finished. To signal failure,
00236 the function can simply raise an exception (which will be
00237 captured by the `.StackContext` and passed along to the ``Future``).
00238
00239 From the caller's perspective, the callback argument is optional.
00240 If one is given, it will be invoked when the function is complete
00241 with `Future.result()` as an argument. If the function fails, the
00242 callback will not be run and an exception will be raised into the
00243 surrounding `.StackContext`.
00244
00245 If no callback is given, the caller should use the ``Future`` to
00246 wait for the function to complete (perhaps by yielding it in a
00247 `.gen.engine` function, or passing it to `.IOLoop.add_future`).
00248
00249 Usage::
00250
00251 @return_future
00252 def future_func(arg1, arg2, callback):
00253 # Do stuff (possibly asynchronous)
00254 callback(result)
00255
00256 @gen.engine
00257 def caller(callback):
00258 yield future_func(arg1, arg2)
00259 callback()
00260
00261 Note that ``@return_future`` and ``@gen.engine`` can be applied to the
00262 same function, provided ``@return_future`` appears first. However,
00263 consider using ``@gen.coroutine`` instead of this combination.
00264 """
00265 replacer = ArgReplacer(f, 'callback')
00266
00267 @functools.wraps(f)
00268 def wrapper(*args, **kwargs):
00269 future = TracebackFuture()
00270 callback, args, kwargs = replacer.replace(
00271 lambda value=_NO_RESULT: future.set_result(value),
00272 args, kwargs)
00273
00274 def handle_error(typ, value, tb):
00275 future.set_exc_info((typ, value, tb))
00276 return True
00277 exc_info = None
00278 with ExceptionStackContext(handle_error):
00279 try:
00280 result = f(*args, **kwargs)
00281 if result is not None:
00282 raise ReturnValueIgnoredError(
00283 "@return_future should not be used with functions "
00284 "that return values")
00285 except:
00286 exc_info = sys.exc_info()
00287 raise
00288 if exc_info is not None:
00289
00290
00291
00292 raise_exc_info(exc_info)
00293
00294
00295
00296
00297
00298
00299
00300 if callback is not None:
00301 def run_callback(future):
00302 result = future.result()
00303 if result is _NO_RESULT:
00304 callback()
00305 else:
00306 callback(future.result())
00307 future.add_done_callback(wrap(run_callback))
00308 return future
00309 return wrapper
00310
00311
00312 def chain_future(a, b):
00313 """Chain two futures together so that when one completes, so does the other.
00314
00315 The result (success or failure) of ``a`` will be copied to ``b``, unless
00316 ``b`` has already been completed or cancelled by the time ``a`` finishes.
00317 """
00318 def copy(future):
00319 assert future is a
00320 if b.done():
00321 return
00322 if (isinstance(a, TracebackFuture) and isinstance(b, TracebackFuture)
00323 and a.exc_info() is not None):
00324 b.set_exc_info(a.exc_info())
00325 elif a.exception() is not None:
00326 b.set_exception(a.exception())
00327 else:
00328 b.set_result(a.result())
00329 a.add_done_callback(copy)