concurrent.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2012 Facebook
00004 #
00005 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00006 # not use this file except in compliance with the License. You may obtain
00007 # a copy of the License at
00008 #
00009 #     http://www.apache.org/licenses/LICENSE-2.0
00010 #
00011 # Unless required by applicable law or agreed to in writing, software
00012 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00013 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00014 # License for the specific language governing permissions and limitations
00015 # under the License.
00016 """Utilities for working with threads and ``Futures``.
00017 
00018 ``Futures`` are a pattern for concurrent programming introduced in
00019 Python 3.2 in the `concurrent.futures` package (this package has also
00020 been backported to older versions of Python and can be installed with
00021 ``pip install futures``).  Tornado will use `concurrent.futures.Future` if
00022 it is available; otherwise it will use a compatible class defined in this
00023 module.
00024 """
00025 from __future__ import absolute_import, division, print_function, with_statement
00026 
00027 import functools
00028 import sys
00029 
00030 from tornado.stack_context import ExceptionStackContext, wrap
00031 from tornado.util import raise_exc_info, ArgReplacer
00032 
00033 try:
00034     from concurrent import futures
00035 except ImportError:
00036     futures = None
00037 
00038 
00039 class ReturnValueIgnoredError(Exception):
00040     pass
00041 
00042 
00043 class Future(object):
00044     """Placeholder for an asynchronous result.
00045 
00046     A ``Future`` encapsulates the result of an asynchronous
00047     operation.  In synchronous applications ``Futures`` are used
00048     to wait for the result from a thread or process pool; in
00049     Tornado they are normally used with `.IOLoop.add_future` or by
00050     yielding them in a `.gen.coroutine`.
00051 
00052     `tornado.concurrent.Future` is similar to
00053     `concurrent.futures.Future`, but not thread-safe (and therefore
00054     faster for use with single-threaded event loops).
00055 
00056     In addition to ``exception`` and ``set_exception``, methods ``exc_info``
00057     and ``set_exc_info`` are supported to capture tracebacks in Python 2.
00058     The traceback is automatically available in Python 3, but in the
00059     Python 2 futures backport this information is discarded.
00060     This functionality was previously available in a separate class
00061     ``TracebackFuture``, which is now a deprecated alias for this class.
00062 
00063     .. versionchanged:: 4.0
00064        `tornado.concurrent.Future` is always a thread-unsafe ``Future``
00065        with support for the ``exc_info`` methods.  Previously it would
00066        be an alias for the thread-safe `concurrent.futures.Future`
00067        if that package was available and fall back to the thread-unsafe
00068        implementation if it was not.
00069 
00070     """
00071     def __init__(self):
00072         self._done = False
00073         self._result = None
00074         self._exception = None
00075         self._exc_info = None
00076         self._callbacks = []
00077 
00078     def cancel(self):
00079         """Cancel the operation, if possible.
00080 
00081         Tornado ``Futures`` do not support cancellation, so this method always
00082         returns False.
00083         """
00084         return False
00085 
00086     def cancelled(self):
00087         """Returns True if the operation has been cancelled.
00088 
00089         Tornado ``Futures`` do not support cancellation, so this method
00090         always returns False.
00091         """
00092         return False
00093 
00094     def running(self):
00095         """Returns True if this operation is currently running."""
00096         return not self._done
00097 
00098     def done(self):
00099         """Returns True if the future has finished running."""
00100         return self._done
00101 
00102     def result(self, timeout=None):
00103         """If the operation succeeded, return its result.  If it failed,
00104         re-raise its exception.
00105         """
00106         if self._result is not None:
00107             return self._result
00108         if self._exc_info is not None:
00109             raise_exc_info(self._exc_info)
00110         elif self._exception is not None:
00111             raise self._exception
00112         self._check_done()
00113         return self._result
00114 
00115     def exception(self, timeout=None):
00116         """If the operation raised an exception, return the `Exception`
00117         object.  Otherwise returns None.
00118         """
00119         if self._exception is not None:
00120             return self._exception
00121         else:
00122             self._check_done()
00123             return None
00124 
00125     def add_done_callback(self, fn):
00126         """Attaches the given callback to the `Future`.
00127 
00128         It will be invoked with the `Future` as its argument when the Future
00129         has finished running and its result is available.  In Tornado
00130         consider using `.IOLoop.add_future` instead of calling
00131         `add_done_callback` directly.
00132         """
00133         if self._done:
00134             fn(self)
00135         else:
00136             self._callbacks.append(fn)
00137 
00138     def set_result(self, result):
00139         """Sets the result of a ``Future``.
00140 
00141         It is undefined to call any of the ``set`` methods more than once
00142         on the same object.
00143         """
00144         self._result = result
00145         self._set_done()
00146 
00147     def set_exception(self, exception):
00148         """Sets the exception of a ``Future.``"""
00149         self._exception = exception
00150         self._set_done()
00151 
00152     def exc_info(self):
00153         """Returns a tuple in the same format as `sys.exc_info` or None.
00154 
00155         .. versionadded:: 4.0
00156         """
00157         return self._exc_info
00158 
00159     def set_exc_info(self, exc_info):
00160         """Sets the exception information of a ``Future.``
00161 
00162         Preserves tracebacks on Python 2.
00163 
00164         .. versionadded:: 4.0
00165         """
00166         self._exc_info = exc_info
00167         self.set_exception(exc_info[1])
00168 
00169     def _check_done(self):
00170         if not self._done:
00171             raise Exception("DummyFuture does not support blocking for results")
00172 
00173     def _set_done(self):
00174         self._done = True
00175         for cb in self._callbacks:
00176             # TODO: error handling
00177             cb(self)
00178         self._callbacks = None
00179 
00180 TracebackFuture = Future
00181 
00182 if futures is None:
00183     FUTURES = Future
00184 else:
00185     FUTURES = (futures.Future, Future)
00186 
00187 
00188 def is_future(x):
00189     return isinstance(x, FUTURES)
00190 
00191 
00192 class DummyExecutor(object):
00193     def submit(self, fn, *args, **kwargs):
00194         future = TracebackFuture()
00195         try:
00196             future.set_result(fn(*args, **kwargs))
00197         except Exception:
00198             future.set_exc_info(sys.exc_info())
00199         return future
00200 
00201     def shutdown(self, wait=True):
00202         pass
00203 
00204 dummy_executor = DummyExecutor()
00205 
00206 
00207 def run_on_executor(fn):
00208     """Decorator to run a synchronous method asynchronously on an executor.
00209 
00210     The decorated method may be called with a ``callback`` keyword
00211     argument and returns a future.
00212 
00213     This decorator should be used only on methods of objects with attributes
00214     ``executor`` and ``io_loop``.
00215     """
00216     @functools.wraps(fn)
00217     def wrapper(self, *args, **kwargs):
00218         callback = kwargs.pop("callback", None)
00219         future = self.executor.submit(fn, self, *args, **kwargs)
00220         if callback:
00221             self.io_loop.add_future(future,
00222                                     lambda future: callback(future.result()))
00223         return future
00224     return wrapper
00225 
00226 
00227 _NO_RESULT = object()
00228 
00229 
00230 def return_future(f):
00231     """Decorator to make a function that returns via callback return a
00232     `Future`.
00233 
00234     The wrapped function should take a ``callback`` keyword argument
00235     and invoke it with one argument when it has finished.  To signal failure,
00236     the function can simply raise an exception (which will be
00237     captured by the `.StackContext` and passed along to the ``Future``).
00238 
00239     From the caller's perspective, the callback argument is optional.
00240     If one is given, it will be invoked when the function is complete
00241     with `Future.result()` as an argument.  If the function fails, the
00242     callback will not be run and an exception will be raised into the
00243     surrounding `.StackContext`.
00244 
00245     If no callback is given, the caller should use the ``Future`` to
00246     wait for the function to complete (perhaps by yielding it in a
00247     `.gen.engine` function, or passing it to `.IOLoop.add_future`).
00248 
00249     Usage::
00250 
00251         @return_future
00252         def future_func(arg1, arg2, callback):
00253             # Do stuff (possibly asynchronous)
00254             callback(result)
00255 
00256         @gen.engine
00257         def caller(callback):
00258             yield future_func(arg1, arg2)
00259             callback()
00260 
00261     Note that ``@return_future`` and ``@gen.engine`` can be applied to the
00262     same function, provided ``@return_future`` appears first.  However,
00263     consider using ``@gen.coroutine`` instead of this combination.
00264     """
00265     replacer = ArgReplacer(f, 'callback')
00266 
00267     @functools.wraps(f)
00268     def wrapper(*args, **kwargs):
00269         future = TracebackFuture()
00270         callback, args, kwargs = replacer.replace(
00271             lambda value=_NO_RESULT: future.set_result(value),
00272             args, kwargs)
00273 
00274         def handle_error(typ, value, tb):
00275             future.set_exc_info((typ, value, tb))
00276             return True
00277         exc_info = None
00278         with ExceptionStackContext(handle_error):
00279             try:
00280                 result = f(*args, **kwargs)
00281                 if result is not None:
00282                     raise ReturnValueIgnoredError(
00283                         "@return_future should not be used with functions "
00284                         "that return values")
00285             except:
00286                 exc_info = sys.exc_info()
00287                 raise
00288         if exc_info is not None:
00289             # If the initial synchronous part of f() raised an exception,
00290             # go ahead and raise it to the caller directly without waiting
00291             # for them to inspect the Future.
00292             raise_exc_info(exc_info)
00293 
00294         # If the caller passed in a callback, schedule it to be called
00295         # when the future resolves.  It is important that this happens
00296         # just before we return the future, or else we risk confusing
00297         # stack contexts with multiple exceptions (one here with the
00298         # immediate exception, and again when the future resolves and
00299         # the callback triggers its exception by calling future.result()).
00300         if callback is not None:
00301             def run_callback(future):
00302                 result = future.result()
00303                 if result is _NO_RESULT:
00304                     callback()
00305                 else:
00306                     callback(future.result())
00307             future.add_done_callback(wrap(run_callback))
00308         return future
00309     return wrapper
00310 
00311 
00312 def chain_future(a, b):
00313     """Chain two futures together so that when one completes, so does the other.
00314 
00315     The result (success or failure) of ``a`` will be copied to ``b``, unless
00316     ``b`` has already been completed or cancelled by the time ``a`` finishes.
00317     """
00318     def copy(future):
00319         assert future is a
00320         if b.done():
00321             return
00322         if (isinstance(a, TracebackFuture) and isinstance(b, TracebackFuture)
00323                 and a.exc_info() is not None):
00324             b.set_exc_info(a.exc_info())
00325         elif a.exception() is not None:
00326             b.set_exception(a.exception())
00327         else:
00328             b.set_result(a.result())
00329     a.add_done_callback(copy)


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:50