ioloop.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2009 Facebook
00004 #
00005 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00006 # not use this file except in compliance with the License. You may obtain
00007 # a copy of the License at
00008 #
00009 #     http://www.apache.org/licenses/LICENSE-2.0
00010 #
00011 # Unless required by applicable law or agreed to in writing, software
00012 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00013 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00014 # License for the specific language governing permissions and limitations
00015 # under the License.
00016 
00017 """An I/O event loop for non-blocking sockets.
00018 
00019 Typical applications will use a single `IOLoop` object, in the
00020 `IOLoop.instance` singleton.  The `IOLoop.start` method should usually
00021 be called at the end of the ``main()`` function.  Atypical applications may
00022 use more than one `IOLoop`, such as one `IOLoop` per thread, or per `unittest`
00023 case.
00024 
00025 In addition to I/O events, the `IOLoop` can also schedule time-based events.
00026 `IOLoop.add_timeout` is a non-blocking alternative to `time.sleep`.
00027 """
00028 
00029 from __future__ import absolute_import, division, print_function, with_statement
00030 
00031 import datetime
00032 import errno
00033 import functools
00034 import heapq
00035 import itertools
00036 import logging
00037 import numbers
00038 import os
00039 import select
00040 import sys
00041 import threading
00042 import time
00043 import traceback
00044 
00045 from tornado.concurrent import TracebackFuture, is_future
00046 from tornado.log import app_log, gen_log
00047 from tornado import stack_context
00048 from tornado.util import Configurable, errno_from_exception, timedelta_to_seconds
00049 
00050 try:
00051     import signal
00052 except ImportError:
00053     signal = None
00054 
00055 try:
00056     import thread  # py2
00057 except ImportError:
00058     import _thread as thread  # py3
00059 
00060 from tornado.platform.auto import set_close_exec, Waker
00061 
00062 
00063 _POLL_TIMEOUT = 3600.0
00064 
00065 
00066 class TimeoutError(Exception):
00067     pass
00068 
00069 
00070 class IOLoop(Configurable):
00071     """A level-triggered I/O loop.
00072 
00073     We use ``epoll`` (Linux) or ``kqueue`` (BSD and Mac OS X) if they
00074     are available, or else we fall back on select(). If you are
00075     implementing a system that needs to handle thousands of
00076     simultaneous connections, you should use a system that supports
00077     either ``epoll`` or ``kqueue``.
00078 
00079     Example usage for a simple TCP server::
00080 
00081         import errno
00082         import functools
00083         import ioloop
00084         import socket
00085 
00086         def connection_ready(sock, fd, events):
00087             while True:
00088                 try:
00089                     connection, address = sock.accept()
00090                 except socket.error, e:
00091                     if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
00092                         raise
00093                     return
00094                 connection.setblocking(0)
00095                 handle_connection(connection, address)
00096 
00097         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00098         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00099         sock.setblocking(0)
00100         sock.bind(("", port))
00101         sock.listen(128)
00102 
00103         io_loop = ioloop.IOLoop.instance()
00104         callback = functools.partial(connection_ready, sock)
00105         io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
00106         io_loop.start()
00107 
00108     """
00109     # Constants from the epoll module
00110     _EPOLLIN = 0x001
00111     _EPOLLPRI = 0x002
00112     _EPOLLOUT = 0x004
00113     _EPOLLERR = 0x008
00114     _EPOLLHUP = 0x010
00115     _EPOLLRDHUP = 0x2000
00116     _EPOLLONESHOT = (1 << 30)
00117     _EPOLLET = (1 << 31)
00118 
00119     # Our events map exactly to the epoll events
00120     NONE = 0
00121     READ = _EPOLLIN
00122     WRITE = _EPOLLOUT
00123     ERROR = _EPOLLERR | _EPOLLHUP
00124 
00125     # Global lock for creating global IOLoop instance
00126     _instance_lock = threading.Lock()
00127 
00128     _current = threading.local()
00129 
00130     @staticmethod
00131     def instance():
00132         """Returns a global `IOLoop` instance.
00133 
00134         Most applications have a single, global `IOLoop` running on the
00135         main thread.  Use this method to get this instance from
00136         another thread.  To get the current thread's `IOLoop`, use `current()`.
00137         """
00138         if not hasattr(IOLoop, "_instance"):
00139             with IOLoop._instance_lock:
00140                 if not hasattr(IOLoop, "_instance"):
00141                     # New instance after double check
00142                     IOLoop._instance = IOLoop()
00143         return IOLoop._instance
00144 
00145     @staticmethod
00146     def initialized():
00147         """Returns true if the singleton instance has been created."""
00148         return hasattr(IOLoop, "_instance")
00149 
00150     def install(self):
00151         """Installs this `IOLoop` object as the singleton instance.
00152 
00153         This is normally not necessary as `instance()` will create
00154         an `IOLoop` on demand, but you may want to call `install` to use
00155         a custom subclass of `IOLoop`.
00156         """
00157         assert not IOLoop.initialized()
00158         IOLoop._instance = self
00159 
00160     @staticmethod
00161     def clear_instance():
00162         """Clear the global `IOLoop` instance.
00163 
00164         .. versionadded:: 4.0
00165         """
00166         if hasattr(IOLoop, "_instance"):
00167             del IOLoop._instance
00168 
00169     @staticmethod
00170     def current():
00171         """Returns the current thread's `IOLoop`.
00172 
00173         If an `IOLoop` is currently running or has been marked as current
00174         by `make_current`, returns that instance.  Otherwise returns
00175         `IOLoop.instance()`, i.e. the main thread's `IOLoop`.
00176 
00177         A common pattern for classes that depend on ``IOLoops`` is to use
00178         a default argument to enable programs with multiple ``IOLoops``
00179         but not require the argument for simpler applications::
00180 
00181             class MyClass(object):
00182                 def __init__(self, io_loop=None):
00183                     self.io_loop = io_loop or IOLoop.current()
00184 
00185         In general you should use `IOLoop.current` as the default when
00186         constructing an asynchronous object, and use `IOLoop.instance`
00187         when you mean to communicate to the main thread from a different
00188         one.
00189         """
00190         current = getattr(IOLoop._current, "instance", None)
00191         if current is None:
00192             return IOLoop.instance()
00193         return current
00194 
00195     def make_current(self):
00196         """Makes this the `IOLoop` for the current thread.
00197 
00198         An `IOLoop` automatically becomes current for its thread
00199         when it is started, but it is sometimes useful to call
00200         `make_current` explictly before starting the `IOLoop`,
00201         so that code run at startup time can find the right
00202         instance.
00203         """
00204         IOLoop._current.instance = self
00205 
00206     @staticmethod
00207     def clear_current():
00208         IOLoop._current.instance = None
00209 
00210     @classmethod
00211     def configurable_base(cls):
00212         return IOLoop
00213 
00214     @classmethod
00215     def configurable_default(cls):
00216         if hasattr(select, "epoll"):
00217             from tornado.platform.epoll import EPollIOLoop
00218             return EPollIOLoop
00219         if hasattr(select, "kqueue"):
00220             # Python 2.6+ on BSD or Mac
00221             from tornado.platform.kqueue import KQueueIOLoop
00222             return KQueueIOLoop
00223         from tornado.platform.select import SelectIOLoop
00224         return SelectIOLoop
00225 
00226     def initialize(self):
00227         pass
00228 
00229     def close(self, all_fds=False):
00230         """Closes the `IOLoop`, freeing any resources used.
00231 
00232         If ``all_fds`` is true, all file descriptors registered on the
00233         IOLoop will be closed (not just the ones created by the
00234         `IOLoop` itself).
00235 
00236         Many applications will only use a single `IOLoop` that runs for the
00237         entire lifetime of the process.  In that case closing the `IOLoop`
00238         is not necessary since everything will be cleaned up when the
00239         process exits.  `IOLoop.close` is provided mainly for scenarios
00240         such as unit tests, which create and destroy a large number of
00241         ``IOLoops``.
00242 
00243         An `IOLoop` must be completely stopped before it can be closed.  This
00244         means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must
00245         be allowed to return before attempting to call `IOLoop.close()`.
00246         Therefore the call to `close` will usually appear just after
00247         the call to `start` rather than near the call to `stop`.
00248 
00249         .. versionchanged:: 3.1
00250            If the `IOLoop` implementation supports non-integer objects
00251            for "file descriptors", those objects will have their
00252            ``close`` method when ``all_fds`` is true.
00253         """
00254         raise NotImplementedError()
00255 
00256     def add_handler(self, fd, handler, events):
00257         """Registers the given handler to receive the given events for ``fd``.
00258 
00259         The ``fd`` argument may either be an integer file descriptor or
00260         a file-like object with a ``fileno()`` method (and optionally a
00261         ``close()`` method, which may be called when the `IOLoop` is shut
00262         down).
00263 
00264         The ``events`` argument is a bitwise or of the constants
00265         ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``.
00266 
00267         When an event occurs, ``handler(fd, events)`` will be run.
00268 
00269         .. versionchanged:: 4.0
00270            Added the ability to pass file-like objects in addition to
00271            raw file descriptors.
00272         """
00273         raise NotImplementedError()
00274 
00275     def update_handler(self, fd, events):
00276         """Changes the events we listen for ``fd``.
00277 
00278         .. versionchanged:: 4.0
00279            Added the ability to pass file-like objects in addition to
00280            raw file descriptors.
00281         """
00282         raise NotImplementedError()
00283 
00284     def remove_handler(self, fd):
00285         """Stop listening for events on ``fd``.
00286 
00287         .. versionchanged:: 4.0
00288            Added the ability to pass file-like objects in addition to
00289            raw file descriptors.
00290         """
00291         raise NotImplementedError()
00292 
00293     def set_blocking_signal_threshold(self, seconds, action):
00294         """Sends a signal if the `IOLoop` is blocked for more than
00295         ``s`` seconds.
00296 
00297         Pass ``seconds=None`` to disable.  Requires Python 2.6 on a unixy
00298         platform.
00299 
00300         The action parameter is a Python signal handler.  Read the
00301         documentation for the `signal` module for more information.
00302         If ``action`` is None, the process will be killed if it is
00303         blocked for too long.
00304         """
00305         raise NotImplementedError()
00306 
00307     def set_blocking_log_threshold(self, seconds):
00308         """Logs a stack trace if the `IOLoop` is blocked for more than
00309         ``s`` seconds.
00310 
00311         Equivalent to ``set_blocking_signal_threshold(seconds,
00312         self.log_stack)``
00313         """
00314         self.set_blocking_signal_threshold(seconds, self.log_stack)
00315 
00316     def log_stack(self, signal, frame):
00317         """Signal handler to log the stack trace of the current thread.
00318 
00319         For use with `set_blocking_signal_threshold`.
00320         """
00321         gen_log.warning('IOLoop blocked for %f seconds in\n%s',
00322                         self._blocking_signal_threshold,
00323                         ''.join(traceback.format_stack(frame)))
00324 
00325     def start(self):
00326         """Starts the I/O loop.
00327 
00328         The loop will run until one of the callbacks calls `stop()`, which
00329         will make the loop stop after the current event iteration completes.
00330         """
00331         raise NotImplementedError()
00332 
00333     def _setup_logging(self):
00334         """The IOLoop catches and logs exceptions, so it's
00335         important that log output be visible.  However, python's
00336         default behavior for non-root loggers (prior to python
00337         3.2) is to print an unhelpful "no handlers could be
00338         found" message rather than the actual log entry, so we
00339         must explicitly configure logging if we've made it this
00340         far without anything.
00341 
00342         This method should be called from start() in subclasses.
00343         """
00344         if not any([logging.getLogger().handlers,
00345                     logging.getLogger('tornado').handlers,
00346                     logging.getLogger('tornado.application').handlers]):
00347             logging.basicConfig()
00348 
00349     def stop(self):
00350         """Stop the I/O loop.
00351 
00352         If the event loop is not currently running, the next call to `start()`
00353         will return immediately.
00354 
00355         To use asynchronous methods from otherwise-synchronous code (such as
00356         unit tests), you can start and stop the event loop like this::
00357 
00358           ioloop = IOLoop()
00359           async_method(ioloop=ioloop, callback=ioloop.stop)
00360           ioloop.start()
00361 
00362         ``ioloop.start()`` will return after ``async_method`` has run
00363         its callback, whether that callback was invoked before or
00364         after ``ioloop.start``.
00365 
00366         Note that even after `stop` has been called, the `IOLoop` is not
00367         completely stopped until `IOLoop.start` has also returned.
00368         Some work that was scheduled before the call to `stop` may still
00369         be run before the `IOLoop` shuts down.
00370         """
00371         raise NotImplementedError()
00372 
00373     def run_sync(self, func, timeout=None):
00374         """Starts the `IOLoop`, runs the given function, and stops the loop.
00375 
00376         If the function returns a `.Future`, the `IOLoop` will run
00377         until the future is resolved.  If it raises an exception, the
00378         `IOLoop` will stop and the exception will be re-raised to the
00379         caller.
00380 
00381         The keyword-only argument ``timeout`` may be used to set
00382         a maximum duration for the function.  If the timeout expires,
00383         a `TimeoutError` is raised.
00384 
00385         This method is useful in conjunction with `tornado.gen.coroutine`
00386         to allow asynchronous calls in a ``main()`` function::
00387 
00388             @gen.coroutine
00389             def main():
00390                 # do stuff...
00391 
00392             if __name__ == '__main__':
00393                 IOLoop.instance().run_sync(main)
00394         """
00395         future_cell = [None]
00396 
00397         def run():
00398             try:
00399                 result = func()
00400             except Exception:
00401                 future_cell[0] = TracebackFuture()
00402                 future_cell[0].set_exc_info(sys.exc_info())
00403             else:
00404                 if is_future(result):
00405                     future_cell[0] = result
00406                 else:
00407                     future_cell[0] = TracebackFuture()
00408                     future_cell[0].set_result(result)
00409             self.add_future(future_cell[0], lambda future: self.stop())
00410         self.add_callback(run)
00411         if timeout is not None:
00412             timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
00413         self.start()
00414         if timeout is not None:
00415             self.remove_timeout(timeout_handle)
00416         if not future_cell[0].done():
00417             raise TimeoutError('Operation timed out after %s seconds' % timeout)
00418         return future_cell[0].result()
00419 
00420     def time(self):
00421         """Returns the current time according to the `IOLoop`'s clock.
00422 
00423         The return value is a floating-point number relative to an
00424         unspecified time in the past.
00425 
00426         By default, the `IOLoop`'s time function is `time.time`.  However,
00427         it may be configured to use e.g. `time.monotonic` instead.
00428         Calls to `add_timeout` that pass a number instead of a
00429         `datetime.timedelta` should use this function to compute the
00430         appropriate time, so they can work no matter what time function
00431         is chosen.
00432         """
00433         return time.time()
00434 
00435     def add_timeout(self, deadline, callback, *args, **kwargs):
00436         """Runs the ``callback`` at the time ``deadline`` from the I/O loop.
00437 
00438         Returns an opaque handle that may be passed to
00439         `remove_timeout` to cancel.
00440 
00441         ``deadline`` may be a number denoting a time (on the same
00442         scale as `IOLoop.time`, normally `time.time`), or a
00443         `datetime.timedelta` object for a deadline relative to the
00444         current time.  Since Tornado 4.0, `call_later` is a more
00445         convenient alternative for the relative case since it does not
00446         require a timedelta object.
00447 
00448         Note that it is not safe to call `add_timeout` from other threads.
00449         Instead, you must use `add_callback` to transfer control to the
00450         `IOLoop`'s thread, and then call `add_timeout` from there.
00451 
00452         Subclasses of IOLoop must implement either `add_timeout` or
00453         `call_at`; the default implementations of each will call
00454         the other.  `call_at` is usually easier to implement, but
00455         subclasses that wish to maintain compatibility with Tornado
00456         versions prior to 4.0 must use `add_timeout` instead.
00457 
00458         .. versionchanged:: 4.0
00459            Now passes through ``*args`` and ``**kwargs`` to the callback.
00460         """
00461         if isinstance(deadline, numbers.Real):
00462             return self.call_at(deadline, callback, *args, **kwargs)
00463         elif isinstance(deadline, datetime.timedelta):
00464             return self.call_at(self.time() + timedelta_to_seconds(deadline),
00465                                 callback, *args, **kwargs)
00466         else:
00467             raise TypeError("Unsupported deadline %r" % deadline)
00468 
00469     def call_later(self, delay, callback, *args, **kwargs):
00470         """Runs the ``callback`` after ``delay`` seconds have passed.
00471 
00472         Returns an opaque handle that may be passed to `remove_timeout`
00473         to cancel.  Note that unlike the `asyncio` method of the same
00474         name, the returned object does not have a ``cancel()`` method.
00475 
00476         See `add_timeout` for comments on thread-safety and subclassing.
00477 
00478         .. versionadded:: 4.0
00479         """
00480         return self.call_at(self.time() + delay, callback, *args, **kwargs)
00481 
00482     def call_at(self, when, callback, *args, **kwargs):
00483         """Runs the ``callback`` at the absolute time designated by ``when``.
00484 
00485         ``when`` must be a number using the same reference point as
00486         `IOLoop.time`.
00487 
00488         Returns an opaque handle that may be passed to `remove_timeout`
00489         to cancel.  Note that unlike the `asyncio` method of the same
00490         name, the returned object does not have a ``cancel()`` method.
00491 
00492         See `add_timeout` for comments on thread-safety and subclassing.
00493 
00494         .. versionadded:: 4.0
00495         """
00496         return self.add_timeout(when, callback, *args, **kwargs)
00497 
00498     def remove_timeout(self, timeout):
00499         """Cancels a pending timeout.
00500 
00501         The argument is a handle as returned by `add_timeout`.  It is
00502         safe to call `remove_timeout` even if the callback has already
00503         been run.
00504         """
00505         raise NotImplementedError()
00506 
00507     def add_callback(self, callback, *args, **kwargs):
00508         """Calls the given callback on the next I/O loop iteration.
00509 
00510         It is safe to call this method from any thread at any time,
00511         except from a signal handler.  Note that this is the **only**
00512         method in `IOLoop` that makes this thread-safety guarantee; all
00513         other interaction with the `IOLoop` must be done from that
00514         `IOLoop`'s thread.  `add_callback()` may be used to transfer
00515         control from other threads to the `IOLoop`'s thread.
00516 
00517         To add a callback from a signal handler, see
00518         `add_callback_from_signal`.
00519         """
00520         raise NotImplementedError()
00521 
00522     def add_callback_from_signal(self, callback, *args, **kwargs):
00523         """Calls the given callback on the next I/O loop iteration.
00524 
00525         Safe for use from a Python signal handler; should not be used
00526         otherwise.
00527 
00528         Callbacks added with this method will be run without any
00529         `.stack_context`, to avoid picking up the context of the function
00530         that was interrupted by the signal.
00531         """
00532         raise NotImplementedError()
00533 
00534     def spawn_callback(self, callback, *args, **kwargs):
00535         """Calls the given callback on the next IOLoop iteration.
00536 
00537         Unlike all other callback-related methods on IOLoop,
00538         ``spawn_callback`` does not associate the callback with its caller's
00539         ``stack_context``, so it is suitable for fire-and-forget callbacks
00540         that should not interfere with the caller.
00541 
00542         .. versionadded:: 4.0
00543         """
00544         with stack_context.NullContext():
00545             self.add_callback(callback, *args, **kwargs)
00546 
00547     def add_future(self, future, callback):
00548         """Schedules a callback on the ``IOLoop`` when the given
00549         `.Future` is finished.
00550 
00551         The callback is invoked with one argument, the
00552         `.Future`.
00553         """
00554         assert is_future(future)
00555         callback = stack_context.wrap(callback)
00556         future.add_done_callback(
00557             lambda future: self.add_callback(callback, future))
00558 
00559     def _run_callback(self, callback):
00560         """Runs a callback with error handling.
00561 
00562         For use in subclasses.
00563         """
00564         try:
00565             ret = callback()
00566             if ret is not None and is_future(ret):
00567                 # Functions that return Futures typically swallow all
00568                 # exceptions and store them in the Future.  If a Future
00569                 # makes it out to the IOLoop, ensure its exception (if any)
00570                 # gets logged too.
00571                 self.add_future(ret, lambda f: f.result())
00572         except Exception:
00573             self.handle_callback_exception(callback)
00574 
00575     def handle_callback_exception(self, callback):
00576         """This method is called whenever a callback run by the `IOLoop`
00577         throws an exception.
00578 
00579         By default simply logs the exception as an error.  Subclasses
00580         may override this method to customize reporting of exceptions.
00581 
00582         The exception itself is not passed explicitly, but is available
00583         in `sys.exc_info`.
00584         """
00585         app_log.error("Exception in callback %r", callback, exc_info=True)
00586 
00587     def split_fd(self, fd):
00588         """Returns an (fd, obj) pair from an ``fd`` parameter.
00589 
00590         We accept both raw file descriptors and file-like objects as
00591         input to `add_handler` and related methods.  When a file-like
00592         object is passed, we must retain the object itself so we can
00593         close it correctly when the `IOLoop` shuts down, but the
00594         poller interfaces favor file descriptors (they will accept
00595         file-like objects and call ``fileno()`` for you, but they
00596         always return the descriptor itself).
00597 
00598         This method is provided for use by `IOLoop` subclasses and should
00599         not generally be used by application code.
00600 
00601         .. versionadded:: 4.0
00602         """
00603         try:
00604             return fd.fileno(), fd
00605         except AttributeError:
00606             return fd, fd
00607 
00608     def close_fd(self, fd):
00609         """Utility method to close an ``fd``.
00610 
00611         If ``fd`` is a file-like object, we close it directly; otherwise
00612         we use `os.close`.
00613 
00614         This method is provided for use by `IOLoop` subclasses (in
00615         implementations of ``IOLoop.close(all_fds=True)`` and should
00616         not generally be used by application code.
00617 
00618         .. versionadded:: 4.0
00619         """
00620         try:
00621             try:
00622                 fd.close()
00623             except AttributeError:
00624                 os.close(fd)
00625         except OSError:
00626             pass
00627 
00628 
00629 class PollIOLoop(IOLoop):
00630     """Base class for IOLoops built around a select-like function.
00631 
00632     For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
00633     (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
00634     `tornado.platform.select.SelectIOLoop` (all platforms).
00635     """
00636     def initialize(self, impl, time_func=None):
00637         super(PollIOLoop, self).initialize()
00638         self._impl = impl
00639         if hasattr(self._impl, 'fileno'):
00640             set_close_exec(self._impl.fileno())
00641         self.time_func = time_func or time.time
00642         self._handlers = {}
00643         self._events = {}
00644         self._callbacks = []
00645         self._callback_lock = threading.Lock()
00646         self._timeouts = []
00647         self._cancellations = 0
00648         self._running = False
00649         self._stopped = False
00650         self._closing = False
00651         self._thread_ident = None
00652         self._blocking_signal_threshold = None
00653         self._timeout_counter = itertools.count()
00654 
00655         # Create a pipe that we send bogus data to when we want to wake
00656         # the I/O loop when it is idle
00657         self._waker = Waker()
00658         self.add_handler(self._waker.fileno(),
00659                          lambda fd, events: self._waker.consume(),
00660                          self.READ)
00661 
00662     def close(self, all_fds=False):
00663         with self._callback_lock:
00664             self._closing = True
00665         self.remove_handler(self._waker.fileno())
00666         if all_fds:
00667             for fd, handler in self._handlers.values():
00668                 self.close_fd(fd)
00669         self._waker.close()
00670         self._impl.close()
00671         self._callbacks = None
00672         self._timeouts = None
00673 
00674     def add_handler(self, fd, handler, events):
00675         fd, obj = self.split_fd(fd)
00676         self._handlers[fd] = (obj, stack_context.wrap(handler))
00677         self._impl.register(fd, events | self.ERROR)
00678 
00679     def update_handler(self, fd, events):
00680         fd, obj = self.split_fd(fd)
00681         self._impl.modify(fd, events | self.ERROR)
00682 
00683     def remove_handler(self, fd):
00684         fd, obj = self.split_fd(fd)
00685         self._handlers.pop(fd, None)
00686         self._events.pop(fd, None)
00687         try:
00688             self._impl.unregister(fd)
00689         except Exception:
00690             gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
00691 
00692     def set_blocking_signal_threshold(self, seconds, action):
00693         if not hasattr(signal, "setitimer"):
00694             gen_log.error("set_blocking_signal_threshold requires a signal module "
00695                           "with the setitimer method")
00696             return
00697         self._blocking_signal_threshold = seconds
00698         if seconds is not None:
00699             signal.signal(signal.SIGALRM,
00700                           action if action is not None else signal.SIG_DFL)
00701 
00702     def start(self):
00703         if self._running:
00704             raise RuntimeError("IOLoop is already running")
00705         self._setup_logging()
00706         if self._stopped:
00707             self._stopped = False
00708             return
00709         old_current = getattr(IOLoop._current, "instance", None)
00710         IOLoop._current.instance = self
00711         self._thread_ident = thread.get_ident()
00712         self._running = True
00713 
00714         # signal.set_wakeup_fd closes a race condition in event loops:
00715         # a signal may arrive at the beginning of select/poll/etc
00716         # before it goes into its interruptible sleep, so the signal
00717         # will be consumed without waking the select.  The solution is
00718         # for the (C, synchronous) signal handler to write to a pipe,
00719         # which will then be seen by select.
00720         #
00721         # In python's signal handling semantics, this only matters on the
00722         # main thread (fortunately, set_wakeup_fd only works on the main
00723         # thread and will raise a ValueError otherwise).
00724         #
00725         # If someone has already set a wakeup fd, we don't want to
00726         # disturb it.  This is an issue for twisted, which does its
00727         # SIGCHILD processing in response to its own wakeup fd being
00728         # written to.  As long as the wakeup fd is registered on the IOLoop,
00729         # the loop will still wake up and everything should work.
00730         old_wakeup_fd = None
00731         if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
00732             # requires python 2.6+, unix.  set_wakeup_fd exists but crashes
00733             # the python process on windows.
00734             try:
00735                 old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
00736                 if old_wakeup_fd != -1:
00737                     # Already set, restore previous value.  This is a little racy,
00738                     # but there's no clean get_wakeup_fd and in real use the
00739                     # IOLoop is just started once at the beginning.
00740                     signal.set_wakeup_fd(old_wakeup_fd)
00741                     old_wakeup_fd = None
00742             except ValueError:  # non-main thread
00743                 pass
00744 
00745         try:
00746             while True:
00747                 # Prevent IO event starvation by delaying new callbacks
00748                 # to the next iteration of the event loop.
00749                 with self._callback_lock:
00750                     callbacks = self._callbacks
00751                     self._callbacks = []
00752 
00753                 # Add any timeouts that have come due to the callback list.
00754                 # Do not run anything until we have determined which ones
00755                 # are ready, so timeouts that call add_timeout cannot
00756                 # schedule anything in this iteration.
00757                 due_timeouts = []
00758                 if self._timeouts:
00759                     now = self.time()
00760                     while self._timeouts:
00761                         if self._timeouts[0].callback is None:
00762                             # The timeout was cancelled.  Note that the
00763                             # cancellation check is repeated below for timeouts
00764                             # that are cancelled by another timeout or callback.
00765                             heapq.heappop(self._timeouts)
00766                             self._cancellations -= 1
00767                         elif self._timeouts[0].deadline <= now:
00768                             due_timeouts.append(heapq.heappop(self._timeouts))
00769                         else:
00770                             break
00771                     if (self._cancellations > 512
00772                             and self._cancellations > (len(self._timeouts) >> 1)):
00773                         # Clean up the timeout queue when it gets large and it's
00774                         # more than half cancellations.
00775                         self._cancellations = 0
00776                         self._timeouts = [x for x in self._timeouts
00777                                           if x.callback is not None]
00778                         heapq.heapify(self._timeouts)
00779 
00780                 for callback in callbacks:
00781                     self._run_callback(callback)
00782                 for timeout in due_timeouts:
00783                     if timeout.callback is not None:
00784                         self._run_callback(timeout.callback)
00785                 # Closures may be holding on to a lot of memory, so allow
00786                 # them to be freed before we go into our poll wait.
00787                 callbacks = callback = due_timeouts = timeout = None
00788 
00789                 if self._callbacks:
00790                     # If any callbacks or timeouts called add_callback,
00791                     # we don't want to wait in poll() before we run them.
00792                     poll_timeout = 0.0
00793                 elif self._timeouts:
00794                     # If there are any timeouts, schedule the first one.
00795                     # Use self.time() instead of 'now' to account for time
00796                     # spent running callbacks.
00797                     poll_timeout = self._timeouts[0].deadline - self.time()
00798                     poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
00799                 else:
00800                     # No timeouts and no callbacks, so use the default.
00801                     poll_timeout = _POLL_TIMEOUT
00802 
00803                 if not self._running:
00804                     break
00805 
00806                 if self._blocking_signal_threshold is not None:
00807                     # clear alarm so it doesn't fire while poll is waiting for
00808                     # events.
00809                     signal.setitimer(signal.ITIMER_REAL, 0, 0)
00810 
00811                 try:
00812                     event_pairs = self._impl.poll(poll_timeout)
00813                 except Exception as e:
00814                     # Depending on python version and IOLoop implementation,
00815                     # different exception types may be thrown and there are
00816                     # two ways EINTR might be signaled:
00817                     # * e.errno == errno.EINTR
00818                     # * e.args is like (errno.EINTR, 'Interrupted system call')
00819                     if errno_from_exception(e) == errno.EINTR:
00820                         continue
00821                     else:
00822                         raise
00823 
00824                 if self._blocking_signal_threshold is not None:
00825                     signal.setitimer(signal.ITIMER_REAL,
00826                                      self._blocking_signal_threshold, 0)
00827 
00828                 # Pop one fd at a time from the set of pending fds and run
00829                 # its handler. Since that handler may perform actions on
00830                 # other file descriptors, there may be reentrant calls to
00831                 # this IOLoop that update self._events
00832                 self._events.update(event_pairs)
00833                 while self._events:
00834                     fd, events = self._events.popitem()
00835                     try:
00836                         fd_obj, handler_func = self._handlers[fd]
00837                         handler_func(fd_obj, events)
00838                     except (OSError, IOError) as e:
00839                         if errno_from_exception(e) == errno.EPIPE:
00840                             # Happens when the client closes the connection
00841                             pass
00842                         else:
00843                             self.handle_callback_exception(self._handlers.get(fd))
00844                     except Exception:
00845                         self.handle_callback_exception(self._handlers.get(fd))
00846                 fd_obj = handler_func = None
00847 
00848         finally:
00849             # reset the stopped flag so another start/stop pair can be issued
00850             self._stopped = False
00851             if self._blocking_signal_threshold is not None:
00852                 signal.setitimer(signal.ITIMER_REAL, 0, 0)
00853             IOLoop._current.instance = old_current
00854             if old_wakeup_fd is not None:
00855                 signal.set_wakeup_fd(old_wakeup_fd)
00856 
00857     def stop(self):
00858         self._running = False
00859         self._stopped = True
00860         self._waker.wake()
00861 
00862     def time(self):
00863         return self.time_func()
00864 
00865     def call_at(self, deadline, callback, *args, **kwargs):
00866         timeout = _Timeout(
00867             deadline,
00868             functools.partial(stack_context.wrap(callback), *args, **kwargs),
00869             self)
00870         heapq.heappush(self._timeouts, timeout)
00871         return timeout
00872 
00873     def remove_timeout(self, timeout):
00874         # Removing from a heap is complicated, so just leave the defunct
00875         # timeout object in the queue (see discussion in
00876         # http://docs.python.org/library/heapq.html).
00877         # If this turns out to be a problem, we could add a garbage
00878         # collection pass whenever there are too many dead timeouts.
00879         timeout.callback = None
00880         self._cancellations += 1
00881 
00882     def add_callback(self, callback, *args, **kwargs):
00883         with self._callback_lock:
00884             if self._closing:
00885                 raise RuntimeError("IOLoop is closing")
00886             list_empty = not self._callbacks
00887             self._callbacks.append(functools.partial(
00888                 stack_context.wrap(callback), *args, **kwargs))
00889             if list_empty and thread.get_ident() != self._thread_ident:
00890                 # If we're in the IOLoop's thread, we know it's not currently
00891                 # polling.  If we're not, and we added the first callback to an
00892                 # empty list, we may need to wake it up (it may wake up on its
00893                 # own, but an occasional extra wake is harmless).  Waking
00894                 # up a polling IOLoop is relatively expensive, so we try to
00895                 # avoid it when we can.
00896                 self._waker.wake()
00897 
00898     def add_callback_from_signal(self, callback, *args, **kwargs):
00899         with stack_context.NullContext():
00900             if thread.get_ident() != self._thread_ident:
00901                 # if the signal is handled on another thread, we can add
00902                 # it normally (modulo the NullContext)
00903                 self.add_callback(callback, *args, **kwargs)
00904             else:
00905                 # If we're on the IOLoop's thread, we cannot use
00906                 # the regular add_callback because it may deadlock on
00907                 # _callback_lock.  Blindly insert into self._callbacks.
00908                 # This is safe because the GIL makes list.append atomic.
00909                 # One subtlety is that if the signal interrupted the
00910                 # _callback_lock block in IOLoop.start, we may modify
00911                 # either the old or new version of self._callbacks,
00912                 # but either way will work.
00913                 self._callbacks.append(functools.partial(
00914                     stack_context.wrap(callback), *args, **kwargs))
00915 
00916 
00917 class _Timeout(object):
00918     """An IOLoop timeout, a UNIX timestamp and a callback"""
00919 
00920     # Reduce memory overhead when there are lots of pending callbacks
00921     __slots__ = ['deadline', 'callback', 'tiebreaker']
00922 
00923     def __init__(self, deadline, callback, io_loop):
00924         if not isinstance(deadline, numbers.Real):
00925             raise TypeError("Unsupported deadline %r" % deadline)
00926         self.deadline = deadline
00927         self.callback = callback
00928         self.tiebreaker = next(io_loop._timeout_counter)
00929 
00930     # Comparison methods to sort by deadline, with object id as a tiebreaker
00931     # to guarantee a consistent ordering.  The heapq module uses __le__
00932     # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons
00933     # use __lt__).
00934     def __lt__(self, other):
00935         return ((self.deadline, self.tiebreaker) <
00936                 (other.deadline, other.tiebreaker))
00937 
00938     def __le__(self, other):
00939         return ((self.deadline, self.tiebreaker) <=
00940                 (other.deadline, other.tiebreaker))
00941 
00942 
00943 class PeriodicCallback(object):
00944     """Schedules the given callback to be called periodically.
00945 
00946     The callback is called every ``callback_time`` milliseconds.
00947 
00948     `start` must be called after the `PeriodicCallback` is created.
00949     """
00950     def __init__(self, callback, callback_time, io_loop=None):
00951         self.callback = callback
00952         if callback_time <= 0:
00953             raise ValueError("Periodic callback must have a positive callback_time")
00954         self.callback_time = callback_time
00955         self.io_loop = io_loop or IOLoop.current()
00956         self._running = False
00957         self._timeout = None
00958 
00959     def start(self):
00960         """Starts the timer."""
00961         self._running = True
00962         self._next_timeout = self.io_loop.time()
00963         self._schedule_next()
00964 
00965     def stop(self):
00966         """Stops the timer."""
00967         self._running = False
00968         if self._timeout is not None:
00969             self.io_loop.remove_timeout(self._timeout)
00970             self._timeout = None
00971 
00972     def _run(self):
00973         if not self._running:
00974             return
00975         try:
00976             return self.callback()
00977         except Exception:
00978             self.io_loop.handle_callback_exception(self.callback)
00979         finally:
00980             self._schedule_next()
00981 
00982     def _schedule_next(self):
00983         if self._running:
00984             current_time = self.io_loop.time()
00985             while self._next_timeout <= current_time:
00986                 self._next_timeout += self.callback_time / 1000.0
00987             self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)


rosbridge_tools
Author(s): Jonathan Mace
autogenerated on Sat Dec 27 2014 11:25:59