00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 """An I/O event loop for non-blocking sockets.
00018
00019 Typical applications will use a single `IOLoop` object, in the
00020 `IOLoop.instance` singleton. The `IOLoop.start` method should usually
00021 be called at the end of the ``main()`` function. Atypical applications may
00022 use more than one `IOLoop`, such as one `IOLoop` per thread, or per `unittest`
00023 case.
00024
00025 In addition to I/O events, the `IOLoop` can also schedule time-based events.
00026 `IOLoop.add_timeout` is a non-blocking alternative to `time.sleep`.
00027 """
00028
00029 from __future__ import absolute_import, division, print_function, with_statement
00030
00031 import datetime
00032 import errno
00033 import functools
00034 import heapq
00035 import itertools
00036 import logging
00037 import numbers
00038 import os
00039 import select
00040 import sys
00041 import threading
00042 import time
00043 import traceback
00044
00045 from tornado.concurrent import TracebackFuture, is_future
00046 from tornado.log import app_log, gen_log
00047 from tornado import stack_context
00048 from tornado.util import Configurable, errno_from_exception, timedelta_to_seconds
00049
00050 try:
00051 import signal
00052 except ImportError:
00053 signal = None
00054
00055 try:
00056 import thread
00057 except ImportError:
00058 import _thread as thread
00059
00060 from tornado.platform.auto import set_close_exec, Waker
00061
00062
00063 _POLL_TIMEOUT = 3600.0
00064
00065
00066 class TimeoutError(Exception):
00067 pass
00068
00069
00070 class IOLoop(Configurable):
00071 """A level-triggered I/O loop.
00072
00073 We use ``epoll`` (Linux) or ``kqueue`` (BSD and Mac OS X) if they
00074 are available, or else we fall back on select(). If you are
00075 implementing a system that needs to handle thousands of
00076 simultaneous connections, you should use a system that supports
00077 either ``epoll`` or ``kqueue``.
00078
00079 Example usage for a simple TCP server::
00080
00081 import errno
00082 import functools
00083 import ioloop
00084 import socket
00085
00086 def connection_ready(sock, fd, events):
00087 while True:
00088 try:
00089 connection, address = sock.accept()
00090 except socket.error, e:
00091 if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
00092 raise
00093 return
00094 connection.setblocking(0)
00095 handle_connection(connection, address)
00096
00097 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00098 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00099 sock.setblocking(0)
00100 sock.bind(("", port))
00101 sock.listen(128)
00102
00103 io_loop = ioloop.IOLoop.instance()
00104 callback = functools.partial(connection_ready, sock)
00105 io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
00106 io_loop.start()
00107
00108 """
00109
00110 _EPOLLIN = 0x001
00111 _EPOLLPRI = 0x002
00112 _EPOLLOUT = 0x004
00113 _EPOLLERR = 0x008
00114 _EPOLLHUP = 0x010
00115 _EPOLLRDHUP = 0x2000
00116 _EPOLLONESHOT = (1 << 30)
00117 _EPOLLET = (1 << 31)
00118
00119
00120 NONE = 0
00121 READ = _EPOLLIN
00122 WRITE = _EPOLLOUT
00123 ERROR = _EPOLLERR | _EPOLLHUP
00124
00125
00126 _instance_lock = threading.Lock()
00127
00128 _current = threading.local()
00129
00130 @staticmethod
00131 def instance():
00132 """Returns a global `IOLoop` instance.
00133
00134 Most applications have a single, global `IOLoop` running on the
00135 main thread. Use this method to get this instance from
00136 another thread. To get the current thread's `IOLoop`, use `current()`.
00137 """
00138 if not hasattr(IOLoop, "_instance"):
00139 with IOLoop._instance_lock:
00140 if not hasattr(IOLoop, "_instance"):
00141
00142 IOLoop._instance = IOLoop()
00143 return IOLoop._instance
00144
00145 @staticmethod
00146 def initialized():
00147 """Returns true if the singleton instance has been created."""
00148 return hasattr(IOLoop, "_instance")
00149
00150 def install(self):
00151 """Installs this `IOLoop` object as the singleton instance.
00152
00153 This is normally not necessary as `instance()` will create
00154 an `IOLoop` on demand, but you may want to call `install` to use
00155 a custom subclass of `IOLoop`.
00156 """
00157 assert not IOLoop.initialized()
00158 IOLoop._instance = self
00159
00160 @staticmethod
00161 def clear_instance():
00162 """Clear the global `IOLoop` instance.
00163
00164 .. versionadded:: 4.0
00165 """
00166 if hasattr(IOLoop, "_instance"):
00167 del IOLoop._instance
00168
00169 @staticmethod
00170 def current():
00171 """Returns the current thread's `IOLoop`.
00172
00173 If an `IOLoop` is currently running or has been marked as current
00174 by `make_current`, returns that instance. Otherwise returns
00175 `IOLoop.instance()`, i.e. the main thread's `IOLoop`.
00176
00177 A common pattern for classes that depend on ``IOLoops`` is to use
00178 a default argument to enable programs with multiple ``IOLoops``
00179 but not require the argument for simpler applications::
00180
00181 class MyClass(object):
00182 def __init__(self, io_loop=None):
00183 self.io_loop = io_loop or IOLoop.current()
00184
00185 In general you should use `IOLoop.current` as the default when
00186 constructing an asynchronous object, and use `IOLoop.instance`
00187 when you mean to communicate to the main thread from a different
00188 one.
00189 """
00190 current = getattr(IOLoop._current, "instance", None)
00191 if current is None:
00192 return IOLoop.instance()
00193 return current
00194
00195 def make_current(self):
00196 """Makes this the `IOLoop` for the current thread.
00197
00198 An `IOLoop` automatically becomes current for its thread
00199 when it is started, but it is sometimes useful to call
00200 `make_current` explictly before starting the `IOLoop`,
00201 so that code run at startup time can find the right
00202 instance.
00203 """
00204 IOLoop._current.instance = self
00205
00206 @staticmethod
00207 def clear_current():
00208 IOLoop._current.instance = None
00209
00210 @classmethod
00211 def configurable_base(cls):
00212 return IOLoop
00213
00214 @classmethod
00215 def configurable_default(cls):
00216 if hasattr(select, "epoll"):
00217 from tornado.platform.epoll import EPollIOLoop
00218 return EPollIOLoop
00219 if hasattr(select, "kqueue"):
00220
00221 from tornado.platform.kqueue import KQueueIOLoop
00222 return KQueueIOLoop
00223 from tornado.platform.select import SelectIOLoop
00224 return SelectIOLoop
00225
00226 def initialize(self):
00227 pass
00228
00229 def close(self, all_fds=False):
00230 """Closes the `IOLoop`, freeing any resources used.
00231
00232 If ``all_fds`` is true, all file descriptors registered on the
00233 IOLoop will be closed (not just the ones created by the
00234 `IOLoop` itself).
00235
00236 Many applications will only use a single `IOLoop` that runs for the
00237 entire lifetime of the process. In that case closing the `IOLoop`
00238 is not necessary since everything will be cleaned up when the
00239 process exits. `IOLoop.close` is provided mainly for scenarios
00240 such as unit tests, which create and destroy a large number of
00241 ``IOLoops``.
00242
00243 An `IOLoop` must be completely stopped before it can be closed. This
00244 means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must
00245 be allowed to return before attempting to call `IOLoop.close()`.
00246 Therefore the call to `close` will usually appear just after
00247 the call to `start` rather than near the call to `stop`.
00248
00249 .. versionchanged:: 3.1
00250 If the `IOLoop` implementation supports non-integer objects
00251 for "file descriptors", those objects will have their
00252 ``close`` method when ``all_fds`` is true.
00253 """
00254 raise NotImplementedError()
00255
00256 def add_handler(self, fd, handler, events):
00257 """Registers the given handler to receive the given events for ``fd``.
00258
00259 The ``fd`` argument may either be an integer file descriptor or
00260 a file-like object with a ``fileno()`` method (and optionally a
00261 ``close()`` method, which may be called when the `IOLoop` is shut
00262 down).
00263
00264 The ``events`` argument is a bitwise or of the constants
00265 ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``.
00266
00267 When an event occurs, ``handler(fd, events)`` will be run.
00268
00269 .. versionchanged:: 4.0
00270 Added the ability to pass file-like objects in addition to
00271 raw file descriptors.
00272 """
00273 raise NotImplementedError()
00274
00275 def update_handler(self, fd, events):
00276 """Changes the events we listen for ``fd``.
00277
00278 .. versionchanged:: 4.0
00279 Added the ability to pass file-like objects in addition to
00280 raw file descriptors.
00281 """
00282 raise NotImplementedError()
00283
00284 def remove_handler(self, fd):
00285 """Stop listening for events on ``fd``.
00286
00287 .. versionchanged:: 4.0
00288 Added the ability to pass file-like objects in addition to
00289 raw file descriptors.
00290 """
00291 raise NotImplementedError()
00292
00293 def set_blocking_signal_threshold(self, seconds, action):
00294 """Sends a signal if the `IOLoop` is blocked for more than
00295 ``s`` seconds.
00296
00297 Pass ``seconds=None`` to disable. Requires Python 2.6 on a unixy
00298 platform.
00299
00300 The action parameter is a Python signal handler. Read the
00301 documentation for the `signal` module for more information.
00302 If ``action`` is None, the process will be killed if it is
00303 blocked for too long.
00304 """
00305 raise NotImplementedError()
00306
00307 def set_blocking_log_threshold(self, seconds):
00308 """Logs a stack trace if the `IOLoop` is blocked for more than
00309 ``s`` seconds.
00310
00311 Equivalent to ``set_blocking_signal_threshold(seconds,
00312 self.log_stack)``
00313 """
00314 self.set_blocking_signal_threshold(seconds, self.log_stack)
00315
00316 def log_stack(self, signal, frame):
00317 """Signal handler to log the stack trace of the current thread.
00318
00319 For use with `set_blocking_signal_threshold`.
00320 """
00321 gen_log.warning('IOLoop blocked for %f seconds in\n%s',
00322 self._blocking_signal_threshold,
00323 ''.join(traceback.format_stack(frame)))
00324
00325 def start(self):
00326 """Starts the I/O loop.
00327
00328 The loop will run until one of the callbacks calls `stop()`, which
00329 will make the loop stop after the current event iteration completes.
00330 """
00331 raise NotImplementedError()
00332
00333 def _setup_logging(self):
00334 """The IOLoop catches and logs exceptions, so it's
00335 important that log output be visible. However, python's
00336 default behavior for non-root loggers (prior to python
00337 3.2) is to print an unhelpful "no handlers could be
00338 found" message rather than the actual log entry, so we
00339 must explicitly configure logging if we've made it this
00340 far without anything.
00341
00342 This method should be called from start() in subclasses.
00343 """
00344 if not any([logging.getLogger().handlers,
00345 logging.getLogger('tornado').handlers,
00346 logging.getLogger('tornado.application').handlers]):
00347 logging.basicConfig()
00348
00349 def stop(self):
00350 """Stop the I/O loop.
00351
00352 If the event loop is not currently running, the next call to `start()`
00353 will return immediately.
00354
00355 To use asynchronous methods from otherwise-synchronous code (such as
00356 unit tests), you can start and stop the event loop like this::
00357
00358 ioloop = IOLoop()
00359 async_method(ioloop=ioloop, callback=ioloop.stop)
00360 ioloop.start()
00361
00362 ``ioloop.start()`` will return after ``async_method`` has run
00363 its callback, whether that callback was invoked before or
00364 after ``ioloop.start``.
00365
00366 Note that even after `stop` has been called, the `IOLoop` is not
00367 completely stopped until `IOLoop.start` has also returned.
00368 Some work that was scheduled before the call to `stop` may still
00369 be run before the `IOLoop` shuts down.
00370 """
00371 raise NotImplementedError()
00372
00373 def run_sync(self, func, timeout=None):
00374 """Starts the `IOLoop`, runs the given function, and stops the loop.
00375
00376 If the function returns a `.Future`, the `IOLoop` will run
00377 until the future is resolved. If it raises an exception, the
00378 `IOLoop` will stop and the exception will be re-raised to the
00379 caller.
00380
00381 The keyword-only argument ``timeout`` may be used to set
00382 a maximum duration for the function. If the timeout expires,
00383 a `TimeoutError` is raised.
00384
00385 This method is useful in conjunction with `tornado.gen.coroutine`
00386 to allow asynchronous calls in a ``main()`` function::
00387
00388 @gen.coroutine
00389 def main():
00390 # do stuff...
00391
00392 if __name__ == '__main__':
00393 IOLoop.instance().run_sync(main)
00394 """
00395 future_cell = [None]
00396
00397 def run():
00398 try:
00399 result = func()
00400 except Exception:
00401 future_cell[0] = TracebackFuture()
00402 future_cell[0].set_exc_info(sys.exc_info())
00403 else:
00404 if is_future(result):
00405 future_cell[0] = result
00406 else:
00407 future_cell[0] = TracebackFuture()
00408 future_cell[0].set_result(result)
00409 self.add_future(future_cell[0], lambda future: self.stop())
00410 self.add_callback(run)
00411 if timeout is not None:
00412 timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
00413 self.start()
00414 if timeout is not None:
00415 self.remove_timeout(timeout_handle)
00416 if not future_cell[0].done():
00417 raise TimeoutError('Operation timed out after %s seconds' % timeout)
00418 return future_cell[0].result()
00419
00420 def time(self):
00421 """Returns the current time according to the `IOLoop`'s clock.
00422
00423 The return value is a floating-point number relative to an
00424 unspecified time in the past.
00425
00426 By default, the `IOLoop`'s time function is `time.time`. However,
00427 it may be configured to use e.g. `time.monotonic` instead.
00428 Calls to `add_timeout` that pass a number instead of a
00429 `datetime.timedelta` should use this function to compute the
00430 appropriate time, so they can work no matter what time function
00431 is chosen.
00432 """
00433 return time.time()
00434
00435 def add_timeout(self, deadline, callback, *args, **kwargs):
00436 """Runs the ``callback`` at the time ``deadline`` from the I/O loop.
00437
00438 Returns an opaque handle that may be passed to
00439 `remove_timeout` to cancel.
00440
00441 ``deadline`` may be a number denoting a time (on the same
00442 scale as `IOLoop.time`, normally `time.time`), or a
00443 `datetime.timedelta` object for a deadline relative to the
00444 current time. Since Tornado 4.0, `call_later` is a more
00445 convenient alternative for the relative case since it does not
00446 require a timedelta object.
00447
00448 Note that it is not safe to call `add_timeout` from other threads.
00449 Instead, you must use `add_callback` to transfer control to the
00450 `IOLoop`'s thread, and then call `add_timeout` from there.
00451
00452 Subclasses of IOLoop must implement either `add_timeout` or
00453 `call_at`; the default implementations of each will call
00454 the other. `call_at` is usually easier to implement, but
00455 subclasses that wish to maintain compatibility with Tornado
00456 versions prior to 4.0 must use `add_timeout` instead.
00457
00458 .. versionchanged:: 4.0
00459 Now passes through ``*args`` and ``**kwargs`` to the callback.
00460 """
00461 if isinstance(deadline, numbers.Real):
00462 return self.call_at(deadline, callback, *args, **kwargs)
00463 elif isinstance(deadline, datetime.timedelta):
00464 return self.call_at(self.time() + timedelta_to_seconds(deadline),
00465 callback, *args, **kwargs)
00466 else:
00467 raise TypeError("Unsupported deadline %r" % deadline)
00468
00469 def call_later(self, delay, callback, *args, **kwargs):
00470 """Runs the ``callback`` after ``delay`` seconds have passed.
00471
00472 Returns an opaque handle that may be passed to `remove_timeout`
00473 to cancel. Note that unlike the `asyncio` method of the same
00474 name, the returned object does not have a ``cancel()`` method.
00475
00476 See `add_timeout` for comments on thread-safety and subclassing.
00477
00478 .. versionadded:: 4.0
00479 """
00480 return self.call_at(self.time() + delay, callback, *args, **kwargs)
00481
00482 def call_at(self, when, callback, *args, **kwargs):
00483 """Runs the ``callback`` at the absolute time designated by ``when``.
00484
00485 ``when`` must be a number using the same reference point as
00486 `IOLoop.time`.
00487
00488 Returns an opaque handle that may be passed to `remove_timeout`
00489 to cancel. Note that unlike the `asyncio` method of the same
00490 name, the returned object does not have a ``cancel()`` method.
00491
00492 See `add_timeout` for comments on thread-safety and subclassing.
00493
00494 .. versionadded:: 4.0
00495 """
00496 return self.add_timeout(when, callback, *args, **kwargs)
00497
00498 def remove_timeout(self, timeout):
00499 """Cancels a pending timeout.
00500
00501 The argument is a handle as returned by `add_timeout`. It is
00502 safe to call `remove_timeout` even if the callback has already
00503 been run.
00504 """
00505 raise NotImplementedError()
00506
00507 def add_callback(self, callback, *args, **kwargs):
00508 """Calls the given callback on the next I/O loop iteration.
00509
00510 It is safe to call this method from any thread at any time,
00511 except from a signal handler. Note that this is the **only**
00512 method in `IOLoop` that makes this thread-safety guarantee; all
00513 other interaction with the `IOLoop` must be done from that
00514 `IOLoop`'s thread. `add_callback()` may be used to transfer
00515 control from other threads to the `IOLoop`'s thread.
00516
00517 To add a callback from a signal handler, see
00518 `add_callback_from_signal`.
00519 """
00520 raise NotImplementedError()
00521
00522 def add_callback_from_signal(self, callback, *args, **kwargs):
00523 """Calls the given callback on the next I/O loop iteration.
00524
00525 Safe for use from a Python signal handler; should not be used
00526 otherwise.
00527
00528 Callbacks added with this method will be run without any
00529 `.stack_context`, to avoid picking up the context of the function
00530 that was interrupted by the signal.
00531 """
00532 raise NotImplementedError()
00533
00534 def spawn_callback(self, callback, *args, **kwargs):
00535 """Calls the given callback on the next IOLoop iteration.
00536
00537 Unlike all other callback-related methods on IOLoop,
00538 ``spawn_callback`` does not associate the callback with its caller's
00539 ``stack_context``, so it is suitable for fire-and-forget callbacks
00540 that should not interfere with the caller.
00541
00542 .. versionadded:: 4.0
00543 """
00544 with stack_context.NullContext():
00545 self.add_callback(callback, *args, **kwargs)
00546
00547 def add_future(self, future, callback):
00548 """Schedules a callback on the ``IOLoop`` when the given
00549 `.Future` is finished.
00550
00551 The callback is invoked with one argument, the
00552 `.Future`.
00553 """
00554 assert is_future(future)
00555 callback = stack_context.wrap(callback)
00556 future.add_done_callback(
00557 lambda future: self.add_callback(callback, future))
00558
00559 def _run_callback(self, callback):
00560 """Runs a callback with error handling.
00561
00562 For use in subclasses.
00563 """
00564 try:
00565 ret = callback()
00566 if ret is not None and is_future(ret):
00567
00568
00569
00570
00571 self.add_future(ret, lambda f: f.result())
00572 except Exception:
00573 self.handle_callback_exception(callback)
00574
00575 def handle_callback_exception(self, callback):
00576 """This method is called whenever a callback run by the `IOLoop`
00577 throws an exception.
00578
00579 By default simply logs the exception as an error. Subclasses
00580 may override this method to customize reporting of exceptions.
00581
00582 The exception itself is not passed explicitly, but is available
00583 in `sys.exc_info`.
00584 """
00585 app_log.error("Exception in callback %r", callback, exc_info=True)
00586
00587 def split_fd(self, fd):
00588 """Returns an (fd, obj) pair from an ``fd`` parameter.
00589
00590 We accept both raw file descriptors and file-like objects as
00591 input to `add_handler` and related methods. When a file-like
00592 object is passed, we must retain the object itself so we can
00593 close it correctly when the `IOLoop` shuts down, but the
00594 poller interfaces favor file descriptors (they will accept
00595 file-like objects and call ``fileno()`` for you, but they
00596 always return the descriptor itself).
00597
00598 This method is provided for use by `IOLoop` subclasses and should
00599 not generally be used by application code.
00600
00601 .. versionadded:: 4.0
00602 """
00603 try:
00604 return fd.fileno(), fd
00605 except AttributeError:
00606 return fd, fd
00607
00608 def close_fd(self, fd):
00609 """Utility method to close an ``fd``.
00610
00611 If ``fd`` is a file-like object, we close it directly; otherwise
00612 we use `os.close`.
00613
00614 This method is provided for use by `IOLoop` subclasses (in
00615 implementations of ``IOLoop.close(all_fds=True)`` and should
00616 not generally be used by application code.
00617
00618 .. versionadded:: 4.0
00619 """
00620 try:
00621 try:
00622 fd.close()
00623 except AttributeError:
00624 os.close(fd)
00625 except OSError:
00626 pass
00627
00628
00629 class PollIOLoop(IOLoop):
00630 """Base class for IOLoops built around a select-like function.
00631
00632 For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
00633 (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
00634 `tornado.platform.select.SelectIOLoop` (all platforms).
00635 """
00636 def initialize(self, impl, time_func=None):
00637 super(PollIOLoop, self).initialize()
00638 self._impl = impl
00639 if hasattr(self._impl, 'fileno'):
00640 set_close_exec(self._impl.fileno())
00641 self.time_func = time_func or time.time
00642 self._handlers = {}
00643 self._events = {}
00644 self._callbacks = []
00645 self._callback_lock = threading.Lock()
00646 self._timeouts = []
00647 self._cancellations = 0
00648 self._running = False
00649 self._stopped = False
00650 self._closing = False
00651 self._thread_ident = None
00652 self._blocking_signal_threshold = None
00653 self._timeout_counter = itertools.count()
00654
00655
00656
00657 self._waker = Waker()
00658 self.add_handler(self._waker.fileno(),
00659 lambda fd, events: self._waker.consume(),
00660 self.READ)
00661
00662 def close(self, all_fds=False):
00663 with self._callback_lock:
00664 self._closing = True
00665 self.remove_handler(self._waker.fileno())
00666 if all_fds:
00667 for fd, handler in self._handlers.values():
00668 self.close_fd(fd)
00669 self._waker.close()
00670 self._impl.close()
00671 self._callbacks = None
00672 self._timeouts = None
00673
00674 def add_handler(self, fd, handler, events):
00675 fd, obj = self.split_fd(fd)
00676 self._handlers[fd] = (obj, stack_context.wrap(handler))
00677 self._impl.register(fd, events | self.ERROR)
00678
00679 def update_handler(self, fd, events):
00680 fd, obj = self.split_fd(fd)
00681 self._impl.modify(fd, events | self.ERROR)
00682
00683 def remove_handler(self, fd):
00684 fd, obj = self.split_fd(fd)
00685 self._handlers.pop(fd, None)
00686 self._events.pop(fd, None)
00687 try:
00688 self._impl.unregister(fd)
00689 except Exception:
00690 gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
00691
00692 def set_blocking_signal_threshold(self, seconds, action):
00693 if not hasattr(signal, "setitimer"):
00694 gen_log.error("set_blocking_signal_threshold requires a signal module "
00695 "with the setitimer method")
00696 return
00697 self._blocking_signal_threshold = seconds
00698 if seconds is not None:
00699 signal.signal(signal.SIGALRM,
00700 action if action is not None else signal.SIG_DFL)
00701
00702 def start(self):
00703 if self._running:
00704 raise RuntimeError("IOLoop is already running")
00705 self._setup_logging()
00706 if self._stopped:
00707 self._stopped = False
00708 return
00709 old_current = getattr(IOLoop._current, "instance", None)
00710 IOLoop._current.instance = self
00711 self._thread_ident = thread.get_ident()
00712 self._running = True
00713
00714
00715
00716
00717
00718
00719
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729
00730 old_wakeup_fd = None
00731 if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
00732
00733
00734 try:
00735 old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
00736 if old_wakeup_fd != -1:
00737
00738
00739
00740 signal.set_wakeup_fd(old_wakeup_fd)
00741 old_wakeup_fd = None
00742 except ValueError:
00743 pass
00744
00745 try:
00746 while True:
00747
00748
00749 with self._callback_lock:
00750 callbacks = self._callbacks
00751 self._callbacks = []
00752
00753
00754
00755
00756
00757 due_timeouts = []
00758 if self._timeouts:
00759 now = self.time()
00760 while self._timeouts:
00761 if self._timeouts[0].callback is None:
00762
00763
00764
00765 heapq.heappop(self._timeouts)
00766 self._cancellations -= 1
00767 elif self._timeouts[0].deadline <= now:
00768 due_timeouts.append(heapq.heappop(self._timeouts))
00769 else:
00770 break
00771 if (self._cancellations > 512
00772 and self._cancellations > (len(self._timeouts) >> 1)):
00773
00774
00775 self._cancellations = 0
00776 self._timeouts = [x for x in self._timeouts
00777 if x.callback is not None]
00778 heapq.heapify(self._timeouts)
00779
00780 for callback in callbacks:
00781 self._run_callback(callback)
00782 for timeout in due_timeouts:
00783 if timeout.callback is not None:
00784 self._run_callback(timeout.callback)
00785
00786
00787 callbacks = callback = due_timeouts = timeout = None
00788
00789 if self._callbacks:
00790
00791
00792 poll_timeout = 0.0
00793 elif self._timeouts:
00794
00795
00796
00797 poll_timeout = self._timeouts[0].deadline - self.time()
00798 poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
00799 else:
00800
00801 poll_timeout = _POLL_TIMEOUT
00802
00803 if not self._running:
00804 break
00805
00806 if self._blocking_signal_threshold is not None:
00807
00808
00809 signal.setitimer(signal.ITIMER_REAL, 0, 0)
00810
00811 try:
00812 event_pairs = self._impl.poll(poll_timeout)
00813 except Exception as e:
00814
00815
00816
00817
00818
00819 if errno_from_exception(e) == errno.EINTR:
00820 continue
00821 else:
00822 raise
00823
00824 if self._blocking_signal_threshold is not None:
00825 signal.setitimer(signal.ITIMER_REAL,
00826 self._blocking_signal_threshold, 0)
00827
00828
00829
00830
00831
00832 self._events.update(event_pairs)
00833 while self._events:
00834 fd, events = self._events.popitem()
00835 try:
00836 fd_obj, handler_func = self._handlers[fd]
00837 handler_func(fd_obj, events)
00838 except (OSError, IOError) as e:
00839 if errno_from_exception(e) == errno.EPIPE:
00840
00841 pass
00842 else:
00843 self.handle_callback_exception(self._handlers.get(fd))
00844 except Exception:
00845 self.handle_callback_exception(self._handlers.get(fd))
00846 fd_obj = handler_func = None
00847
00848 finally:
00849
00850 self._stopped = False
00851 if self._blocking_signal_threshold is not None:
00852 signal.setitimer(signal.ITIMER_REAL, 0, 0)
00853 IOLoop._current.instance = old_current
00854 if old_wakeup_fd is not None:
00855 signal.set_wakeup_fd(old_wakeup_fd)
00856
00857 def stop(self):
00858 self._running = False
00859 self._stopped = True
00860 self._waker.wake()
00861
00862 def time(self):
00863 return self.time_func()
00864
00865 def call_at(self, deadline, callback, *args, **kwargs):
00866 timeout = _Timeout(
00867 deadline,
00868 functools.partial(stack_context.wrap(callback), *args, **kwargs),
00869 self)
00870 heapq.heappush(self._timeouts, timeout)
00871 return timeout
00872
00873 def remove_timeout(self, timeout):
00874
00875
00876
00877
00878
00879 timeout.callback = None
00880 self._cancellations += 1
00881
00882 def add_callback(self, callback, *args, **kwargs):
00883 with self._callback_lock:
00884 if self._closing:
00885 raise RuntimeError("IOLoop is closing")
00886 list_empty = not self._callbacks
00887 self._callbacks.append(functools.partial(
00888 stack_context.wrap(callback), *args, **kwargs))
00889 if list_empty and thread.get_ident() != self._thread_ident:
00890
00891
00892
00893
00894
00895
00896 self._waker.wake()
00897
00898 def add_callback_from_signal(self, callback, *args, **kwargs):
00899 with stack_context.NullContext():
00900 if thread.get_ident() != self._thread_ident:
00901
00902
00903 self.add_callback(callback, *args, **kwargs)
00904 else:
00905
00906
00907
00908
00909
00910
00911
00912
00913 self._callbacks.append(functools.partial(
00914 stack_context.wrap(callback), *args, **kwargs))
00915
00916
00917 class _Timeout(object):
00918 """An IOLoop timeout, a UNIX timestamp and a callback"""
00919
00920
00921 __slots__ = ['deadline', 'callback', 'tiebreaker']
00922
00923 def __init__(self, deadline, callback, io_loop):
00924 if not isinstance(deadline, numbers.Real):
00925 raise TypeError("Unsupported deadline %r" % deadline)
00926 self.deadline = deadline
00927 self.callback = callback
00928 self.tiebreaker = next(io_loop._timeout_counter)
00929
00930
00931
00932
00933
00934 def __lt__(self, other):
00935 return ((self.deadline, self.tiebreaker) <
00936 (other.deadline, other.tiebreaker))
00937
00938 def __le__(self, other):
00939 return ((self.deadline, self.tiebreaker) <=
00940 (other.deadline, other.tiebreaker))
00941
00942
00943 class PeriodicCallback(object):
00944 """Schedules the given callback to be called periodically.
00945
00946 The callback is called every ``callback_time`` milliseconds.
00947
00948 `start` must be called after the `PeriodicCallback` is created.
00949 """
00950 def __init__(self, callback, callback_time, io_loop=None):
00951 self.callback = callback
00952 if callback_time <= 0:
00953 raise ValueError("Periodic callback must have a positive callback_time")
00954 self.callback_time = callback_time
00955 self.io_loop = io_loop or IOLoop.current()
00956 self._running = False
00957 self._timeout = None
00958
00959 def start(self):
00960 """Starts the timer."""
00961 self._running = True
00962 self._next_timeout = self.io_loop.time()
00963 self._schedule_next()
00964
00965 def stop(self):
00966 """Stops the timer."""
00967 self._running = False
00968 if self._timeout is not None:
00969 self.io_loop.remove_timeout(self._timeout)
00970 self._timeout = None
00971
00972 def _run(self):
00973 if not self._running:
00974 return
00975 try:
00976 return self.callback()
00977 except Exception:
00978 self.io_loop.handle_callback_exception(self.callback)
00979 finally:
00980 self._schedule_next()
00981
00982 def _schedule_next(self):
00983 if self._running:
00984 current_time = self.io_loop.time()
00985 while self._next_timeout <= current_time:
00986 self._next_timeout += self.callback_time / 1000.0
00987 self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)