00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 """An I/O event loop for non-blocking sockets.
00018
00019 Typical applications will use a single `IOLoop` object, in the
00020 `IOLoop.instance` singleton. The `IOLoop.start` method should usually
00021 be called at the end of the ``main()`` function. Atypical applications may
00022 use more than one `IOLoop`, such as one `IOLoop` per thread, or per `unittest`
00023 case.
00024
00025 In addition to I/O events, the `IOLoop` can also schedule time-based events.
00026 `IOLoop.add_timeout` is a non-blocking alternative to `time.sleep`.
00027 """
00028
00029 from __future__ import absolute_import, division, with_statement
00030
00031 import datetime
00032 import errno
00033 import heapq
00034 import os
00035 import logging
00036 import select
00037 import thread
00038 import threading
00039 import time
00040 import traceback
00041
00042 from tornado import stack_context
00043
00044 try:
00045 import signal
00046 except ImportError:
00047 signal = None
00048
00049 from tornado.platform.auto import set_close_exec, Waker
00050
00051
00052 class IOLoop(object):
00053 """A level-triggered I/O loop.
00054
00055 We use epoll (Linux) or kqueue (BSD and Mac OS X; requires python
00056 2.6+) if they are available, or else we fall back on select(). If
00057 you are implementing a system that needs to handle thousands of
00058 simultaneous connections, you should use a system that supports either
00059 epoll or queue.
00060
00061 Example usage for a simple TCP server::
00062
00063 import errno
00064 import functools
00065 import ioloop
00066 import socket
00067
00068 def connection_ready(sock, fd, events):
00069 while True:
00070 try:
00071 connection, address = sock.accept()
00072 except socket.error, e:
00073 if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
00074 raise
00075 return
00076 connection.setblocking(0)
00077 handle_connection(connection, address)
00078
00079 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00080 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00081 sock.setblocking(0)
00082 sock.bind(("", port))
00083 sock.listen(128)
00084
00085 io_loop = ioloop.IOLoop.instance()
00086 callback = functools.partial(connection_ready, sock)
00087 io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
00088 io_loop.start()
00089
00090 """
00091
00092 _EPOLLIN = 0x001
00093 _EPOLLPRI = 0x002
00094 _EPOLLOUT = 0x004
00095 _EPOLLERR = 0x008
00096 _EPOLLHUP = 0x010
00097 _EPOLLRDHUP = 0x2000
00098 _EPOLLONESHOT = (1 << 30)
00099 _EPOLLET = (1 << 31)
00100
00101
00102 NONE = 0
00103 READ = _EPOLLIN
00104 WRITE = _EPOLLOUT
00105 ERROR = _EPOLLERR | _EPOLLHUP
00106
00107
00108 _instance_lock = threading.Lock()
00109
00110 def __init__(self, impl=None):
00111 self._impl = impl or _poll()
00112 if hasattr(self._impl, 'fileno'):
00113 set_close_exec(self._impl.fileno())
00114 self._handlers = {}
00115 self._events = {}
00116 self._callbacks = []
00117 self._callback_lock = threading.Lock()
00118 self._timeouts = []
00119 self._running = False
00120 self._stopped = False
00121 self._thread_ident = None
00122 self._blocking_signal_threshold = None
00123
00124
00125
00126 self._waker = Waker()
00127 self.add_handler(self._waker.fileno(),
00128 lambda fd, events: self._waker.consume(),
00129 self.READ)
00130
00131 @staticmethod
00132 def instance():
00133 """Returns a global IOLoop instance.
00134
00135 Most single-threaded applications have a single, global IOLoop.
00136 Use this method instead of passing around IOLoop instances
00137 throughout your code.
00138
00139 A common pattern for classes that depend on IOLoops is to use
00140 a default argument to enable programs with multiple IOLoops
00141 but not require the argument for simpler applications::
00142
00143 class MyClass(object):
00144 def __init__(self, io_loop=None):
00145 self.io_loop = io_loop or IOLoop.instance()
00146 """
00147 if not hasattr(IOLoop, "_instance"):
00148 with IOLoop._instance_lock:
00149 if not hasattr(IOLoop, "_instance"):
00150
00151 IOLoop._instance = IOLoop()
00152 return IOLoop._instance
00153
00154 @staticmethod
00155 def initialized():
00156 """Returns true if the singleton instance has been created."""
00157 return hasattr(IOLoop, "_instance")
00158
00159 def install(self):
00160 """Installs this IOloop object as the singleton instance.
00161
00162 This is normally not necessary as `instance()` will create
00163 an IOLoop on demand, but you may want to call `install` to use
00164 a custom subclass of IOLoop.
00165 """
00166 assert not IOLoop.initialized()
00167 IOLoop._instance = self
00168
00169 def close(self, all_fds=False):
00170 """Closes the IOLoop, freeing any resources used.
00171
00172 If ``all_fds`` is true, all file descriptors registered on the
00173 IOLoop will be closed (not just the ones created by the IOLoop itself).
00174
00175 Many applications will only use a single IOLoop that runs for the
00176 entire lifetime of the process. In that case closing the IOLoop
00177 is not necessary since everything will be cleaned up when the
00178 process exits. `IOLoop.close` is provided mainly for scenarios
00179 such as unit tests, which create and destroy a large number of
00180 IOLoops.
00181
00182 An IOLoop must be completely stopped before it can be closed. This
00183 means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must
00184 be allowed to return before attempting to call `IOLoop.close()`.
00185 Therefore the call to `close` will usually appear just after
00186 the call to `start` rather than near the call to `stop`.
00187 """
00188 self.remove_handler(self._waker.fileno())
00189 if all_fds:
00190 for fd in self._handlers.keys()[:]:
00191 try:
00192 os.close(fd)
00193 except Exception:
00194 logging.debug("error closing fd %s", fd, exc_info=True)
00195 self._waker.close()
00196 self._impl.close()
00197
00198 def add_handler(self, fd, handler, events):
00199 """Registers the given handler to receive the given events for fd."""
00200 self._handlers[fd] = stack_context.wrap(handler)
00201 self._impl.register(fd, events | self.ERROR)
00202
00203 def update_handler(self, fd, events):
00204 """Changes the events we listen for fd."""
00205 self._impl.modify(fd, events | self.ERROR)
00206
00207 def remove_handler(self, fd):
00208 """Stop listening for events on fd."""
00209 self._handlers.pop(fd, None)
00210 self._events.pop(fd, None)
00211 try:
00212 self._impl.unregister(fd)
00213 except (OSError, IOError):
00214 logging.debug("Error deleting fd from IOLoop", exc_info=True)
00215
00216 def set_blocking_signal_threshold(self, seconds, action):
00217 """Sends a signal if the ioloop is blocked for more than s seconds.
00218
00219 Pass seconds=None to disable. Requires python 2.6 on a unixy
00220 platform.
00221
00222 The action parameter is a python signal handler. Read the
00223 documentation for the python 'signal' module for more information.
00224 If action is None, the process will be killed if it is blocked for
00225 too long.
00226 """
00227 if not hasattr(signal, "setitimer"):
00228 logging.error("set_blocking_signal_threshold requires a signal module "
00229 "with the setitimer method")
00230 return
00231 self._blocking_signal_threshold = seconds
00232 if seconds is not None:
00233 signal.signal(signal.SIGALRM,
00234 action if action is not None else signal.SIG_DFL)
00235
00236 def set_blocking_log_threshold(self, seconds):
00237 """Logs a stack trace if the ioloop is blocked for more than s seconds.
00238 Equivalent to set_blocking_signal_threshold(seconds, self.log_stack)
00239 """
00240 self.set_blocking_signal_threshold(seconds, self.log_stack)
00241
00242 def log_stack(self, signal, frame):
00243 """Signal handler to log the stack trace of the current thread.
00244
00245 For use with set_blocking_signal_threshold.
00246 """
00247 logging.warning('IOLoop blocked for %f seconds in\n%s',
00248 self._blocking_signal_threshold,
00249 ''.join(traceback.format_stack(frame)))
00250
00251 def start(self):
00252 """Starts the I/O loop.
00253
00254 The loop will run until one of the I/O handlers calls stop(), which
00255 will make the loop stop after the current event iteration completes.
00256 """
00257 if self._stopped:
00258 self._stopped = False
00259 return
00260 self._thread_ident = thread.get_ident()
00261 self._running = True
00262 while True:
00263 poll_timeout = 3600.0
00264
00265
00266
00267 with self._callback_lock:
00268 callbacks = self._callbacks
00269 self._callbacks = []
00270 for callback in callbacks:
00271 self._run_callback(callback)
00272
00273 if self._timeouts:
00274 now = time.time()
00275 while self._timeouts:
00276 if self._timeouts[0].callback is None:
00277
00278 heapq.heappop(self._timeouts)
00279 elif self._timeouts[0].deadline <= now:
00280 timeout = heapq.heappop(self._timeouts)
00281 self._run_callback(timeout.callback)
00282 else:
00283 seconds = self._timeouts[0].deadline - now
00284 poll_timeout = min(seconds, poll_timeout)
00285 break
00286
00287 if self._callbacks:
00288
00289
00290 poll_timeout = 0.0
00291
00292 if not self._running:
00293 break
00294
00295 if self._blocking_signal_threshold is not None:
00296
00297
00298 signal.setitimer(signal.ITIMER_REAL, 0, 0)
00299
00300 try:
00301 event_pairs = self._impl.poll(poll_timeout)
00302 except Exception, e:
00303
00304
00305
00306
00307
00308 if (getattr(e, 'errno', None) == errno.EINTR or
00309 (isinstance(getattr(e, 'args', None), tuple) and
00310 len(e.args) == 2 and e.args[0] == errno.EINTR)):
00311 continue
00312 else:
00313 raise
00314
00315 if self._blocking_signal_threshold is not None:
00316 signal.setitimer(signal.ITIMER_REAL,
00317 self._blocking_signal_threshold, 0)
00318
00319
00320
00321
00322
00323 self._events.update(event_pairs)
00324 while self._events:
00325 fd, events = self._events.popitem()
00326 try:
00327 self._handlers[fd](fd, events)
00328 except (OSError, IOError), e:
00329 if e.args[0] == errno.EPIPE:
00330
00331 pass
00332 else:
00333 logging.error("Exception in I/O handler for fd %s",
00334 fd, exc_info=True)
00335 except Exception:
00336 logging.error("Exception in I/O handler for fd %s",
00337 fd, exc_info=True)
00338
00339 self._stopped = False
00340 if self._blocking_signal_threshold is not None:
00341 signal.setitimer(signal.ITIMER_REAL, 0, 0)
00342
00343 def stop(self):
00344 """Stop the loop after the current event loop iteration is complete.
00345 If the event loop is not currently running, the next call to start()
00346 will return immediately.
00347
00348 To use asynchronous methods from otherwise-synchronous code (such as
00349 unit tests), you can start and stop the event loop like this::
00350
00351 ioloop = IOLoop()
00352 async_method(ioloop=ioloop, callback=ioloop.stop)
00353 ioloop.start()
00354
00355 ioloop.start() will return after async_method has run its callback,
00356 whether that callback was invoked before or after ioloop.start.
00357
00358 Note that even after `stop` has been called, the IOLoop is not
00359 completely stopped until `IOLoop.start` has also returned.
00360 """
00361 self._running = False
00362 self._stopped = True
00363 self._waker.wake()
00364
00365 def running(self):
00366 """Returns true if this IOLoop is currently running."""
00367 return self._running
00368
00369 def add_timeout(self, deadline, callback):
00370 """Calls the given callback at the time deadline from the I/O loop.
00371
00372 Returns a handle that may be passed to remove_timeout to cancel.
00373
00374 ``deadline`` may be a number denoting a unix timestamp (as returned
00375 by ``time.time()`` or a ``datetime.timedelta`` object for a deadline
00376 relative to the current time.
00377
00378 Note that it is not safe to call `add_timeout` from other threads.
00379 Instead, you must use `add_callback` to transfer control to the
00380 IOLoop's thread, and then call `add_timeout` from there.
00381 """
00382 timeout = _Timeout(deadline, stack_context.wrap(callback))
00383 heapq.heappush(self._timeouts, timeout)
00384 return timeout
00385
00386 def remove_timeout(self, timeout):
00387 """Cancels a pending timeout.
00388
00389 The argument is a handle as returned by add_timeout.
00390 """
00391
00392
00393
00394
00395
00396 timeout.callback = None
00397
00398 def add_callback(self, callback):
00399 """Calls the given callback on the next I/O loop iteration.
00400
00401 It is safe to call this method from any thread at any time.
00402 Note that this is the *only* method in IOLoop that makes this
00403 guarantee; all other interaction with the IOLoop must be done
00404 from that IOLoop's thread. add_callback() may be used to transfer
00405 control from other threads to the IOLoop's thread.
00406 """
00407 with self._callback_lock:
00408 list_empty = not self._callbacks
00409 self._callbacks.append(stack_context.wrap(callback))
00410 if list_empty and thread.get_ident() != self._thread_ident:
00411
00412
00413
00414
00415
00416
00417 self._waker.wake()
00418
00419 def _run_callback(self, callback):
00420 try:
00421 callback()
00422 except Exception:
00423 self.handle_callback_exception(callback)
00424
00425 def handle_callback_exception(self, callback):
00426 """This method is called whenever a callback run by the IOLoop
00427 throws an exception.
00428
00429 By default simply logs the exception as an error. Subclasses
00430 may override this method to customize reporting of exceptions.
00431
00432 The exception itself is not passed explicitly, but is available
00433 in sys.exc_info.
00434 """
00435 logging.error("Exception in callback %r", callback, exc_info=True)
00436
00437
00438 class _Timeout(object):
00439 """An IOLoop timeout, a UNIX timestamp and a callback"""
00440
00441
00442 __slots__ = ['deadline', 'callback']
00443
00444 def __init__(self, deadline, callback):
00445 if isinstance(deadline, (int, long, float)):
00446 self.deadline = deadline
00447 elif isinstance(deadline, datetime.timedelta):
00448 self.deadline = time.time() + _Timeout.timedelta_to_seconds(deadline)
00449 else:
00450 raise TypeError("Unsupported deadline %r" % deadline)
00451 self.callback = callback
00452
00453 @staticmethod
00454 def timedelta_to_seconds(td):
00455 """Equivalent to td.total_seconds() (introduced in python 2.7)."""
00456 return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
00457
00458
00459
00460
00461
00462 def __lt__(self, other):
00463 return ((self.deadline, id(self)) <
00464 (other.deadline, id(other)))
00465
00466 def __le__(self, other):
00467 return ((self.deadline, id(self)) <=
00468 (other.deadline, id(other)))
00469
00470
00471 class PeriodicCallback(object):
00472 """Schedules the given callback to be called periodically.
00473
00474 The callback is called every callback_time milliseconds.
00475
00476 `start` must be called after the PeriodicCallback is created.
00477 """
00478 def __init__(self, callback, callback_time, io_loop=None):
00479 self.callback = callback
00480 self.callback_time = callback_time
00481 self.io_loop = io_loop or IOLoop.instance()
00482 self._running = False
00483 self._timeout = None
00484
00485 def start(self):
00486 """Starts the timer."""
00487 self._running = True
00488 self._next_timeout = time.time()
00489 self._schedule_next()
00490
00491 def stop(self):
00492 """Stops the timer."""
00493 self._running = False
00494 if self._timeout is not None:
00495 self.io_loop.remove_timeout(self._timeout)
00496 self._timeout = None
00497
00498 def _run(self):
00499 if not self._running:
00500 return
00501 try:
00502 self.callback()
00503 except Exception:
00504 logging.error("Error in periodic callback", exc_info=True)
00505 self._schedule_next()
00506
00507 def _schedule_next(self):
00508 if self._running:
00509 current_time = time.time()
00510 while self._next_timeout <= current_time:
00511 self._next_timeout += self.callback_time / 1000.0
00512 self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)
00513
00514
00515 class _EPoll(object):
00516 """An epoll-based event loop using our C module for Python 2.5 systems"""
00517 _EPOLL_CTL_ADD = 1
00518 _EPOLL_CTL_DEL = 2
00519 _EPOLL_CTL_MOD = 3
00520
00521 def __init__(self):
00522 self._epoll_fd = epoll.epoll_create()
00523
00524 def fileno(self):
00525 return self._epoll_fd
00526
00527 def close(self):
00528 os.close(self._epoll_fd)
00529
00530 def register(self, fd, events):
00531 epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_ADD, fd, events)
00532
00533 def modify(self, fd, events):
00534 epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_MOD, fd, events)
00535
00536 def unregister(self, fd):
00537 epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_DEL, fd, 0)
00538
00539 def poll(self, timeout):
00540 return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000))
00541
00542
00543 class _KQueue(object):
00544 """A kqueue-based event loop for BSD/Mac systems."""
00545 def __init__(self):
00546 self._kqueue = select.kqueue()
00547 self._active = {}
00548
00549 def fileno(self):
00550 return self._kqueue.fileno()
00551
00552 def close(self):
00553 self._kqueue.close()
00554
00555 def register(self, fd, events):
00556 self._control(fd, events, select.KQ_EV_ADD)
00557 self._active[fd] = events
00558
00559 def modify(self, fd, events):
00560 self.unregister(fd)
00561 self.register(fd, events)
00562
00563 def unregister(self, fd):
00564 events = self._active.pop(fd)
00565 self._control(fd, events, select.KQ_EV_DELETE)
00566
00567 def _control(self, fd, events, flags):
00568 kevents = []
00569 if events & IOLoop.WRITE:
00570 kevents.append(select.kevent(
00571 fd, filter=select.KQ_FILTER_WRITE, flags=flags))
00572 if events & IOLoop.READ or not kevents:
00573
00574 kevents.append(select.kevent(
00575 fd, filter=select.KQ_FILTER_READ, flags=flags))
00576
00577
00578 for kevent in kevents:
00579 self._kqueue.control([kevent], 0)
00580
00581 def poll(self, timeout):
00582 kevents = self._kqueue.control(None, 1000, timeout)
00583 events = {}
00584 for kevent in kevents:
00585 fd = kevent.ident
00586 if kevent.filter == select.KQ_FILTER_READ:
00587 events[fd] = events.get(fd, 0) | IOLoop.READ
00588 if kevent.filter == select.KQ_FILTER_WRITE:
00589 if kevent.flags & select.KQ_EV_EOF:
00590
00591
00592
00593
00594
00595
00596
00597 events[fd] = IOLoop.ERROR
00598 else:
00599 events[fd] = events.get(fd, 0) | IOLoop.WRITE
00600 if kevent.flags & select.KQ_EV_ERROR:
00601 events[fd] = events.get(fd, 0) | IOLoop.ERROR
00602 return events.items()
00603
00604
00605 class _Select(object):
00606 """A simple, select()-based IOLoop implementation for non-Linux systems"""
00607 def __init__(self):
00608 self.read_fds = set()
00609 self.write_fds = set()
00610 self.error_fds = set()
00611 self.fd_sets = (self.read_fds, self.write_fds, self.error_fds)
00612
00613 def close(self):
00614 pass
00615
00616 def register(self, fd, events):
00617 if events & IOLoop.READ:
00618 self.read_fds.add(fd)
00619 if events & IOLoop.WRITE:
00620 self.write_fds.add(fd)
00621 if events & IOLoop.ERROR:
00622 self.error_fds.add(fd)
00623
00624
00625
00626 self.read_fds.add(fd)
00627
00628 def modify(self, fd, events):
00629 self.unregister(fd)
00630 self.register(fd, events)
00631
00632 def unregister(self, fd):
00633 self.read_fds.discard(fd)
00634 self.write_fds.discard(fd)
00635 self.error_fds.discard(fd)
00636
00637 def poll(self, timeout):
00638 readable, writeable, errors = select.select(
00639 self.read_fds, self.write_fds, self.error_fds, timeout)
00640 events = {}
00641 for fd in readable:
00642 events[fd] = events.get(fd, 0) | IOLoop.READ
00643 for fd in writeable:
00644 events[fd] = events.get(fd, 0) | IOLoop.WRITE
00645 for fd in errors:
00646 events[fd] = events.get(fd, 0) | IOLoop.ERROR
00647 return events.items()
00648
00649
00650
00651
00652 if hasattr(select, "epoll"):
00653
00654 _poll = select.epoll
00655 elif hasattr(select, "kqueue"):
00656
00657 _poll = _KQueue
00658 else:
00659 try:
00660
00661 from tornado import epoll
00662 _poll = _EPoll
00663 except Exception:
00664
00665 import sys
00666 if "linux" in sys.platform:
00667 logging.warning("epoll module not found; using select()")
00668 _poll = _Select