00001 """Bridges between the `asyncio` module and Tornado IOLoop.
00002
00003 This is a work in progress and interfaces are subject to change.
00004
00005 To test:
00006 python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop
00007 python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOMainLoop
00008 (the tests log a few warnings with AsyncIOMainLoop because they leave some
00009 unfinished callbacks on the event loop that fail when it resumes)
00010 """
00011
00012 from __future__ import absolute_import, division, print_function, with_statement
00013 import datetime
00014 import functools
00015
00016 from tornado.ioloop import IOLoop
00017 from tornado import stack_context
00018 from tornado.util import timedelta_to_seconds
00019
00020 try:
00021
00022
00023 import asyncio
00024 except ImportError as e:
00025
00026 try:
00027 import trollius as asyncio
00028 except ImportError:
00029
00030 raise e
00031
00032 class BaseAsyncIOLoop(IOLoop):
00033 def initialize(self, asyncio_loop, close_loop=False):
00034 self.asyncio_loop = asyncio_loop
00035 self.close_loop = close_loop
00036 self.asyncio_loop.call_soon(self.make_current)
00037
00038 self.handlers = {}
00039
00040 self.readers = set()
00041 self.writers = set()
00042 self.closing = False
00043
00044 def close(self, all_fds=False):
00045 self.closing = True
00046 for fd in list(self.handlers):
00047 fileobj, handler_func = self.handlers[fd]
00048 self.remove_handler(fd)
00049 if all_fds:
00050 self.close_fd(fileobj)
00051 if self.close_loop:
00052 self.asyncio_loop.close()
00053
00054 def add_handler(self, fd, handler, events):
00055 fd, fileobj = self.split_fd(fd)
00056 if fd in self.handlers:
00057 raise ValueError("fd %s added twice" % fd)
00058 self.handlers[fd] = (fileobj, stack_context.wrap(handler))
00059 if events & IOLoop.READ:
00060 self.asyncio_loop.add_reader(
00061 fd, self._handle_events, fd, IOLoop.READ)
00062 self.readers.add(fd)
00063 if events & IOLoop.WRITE:
00064 self.asyncio_loop.add_writer(
00065 fd, self._handle_events, fd, IOLoop.WRITE)
00066 self.writers.add(fd)
00067
00068 def update_handler(self, fd, events):
00069 fd, fileobj = self.split_fd(fd)
00070 if events & IOLoop.READ:
00071 if fd not in self.readers:
00072 self.asyncio_loop.add_reader(
00073 fd, self._handle_events, fd, IOLoop.READ)
00074 self.readers.add(fd)
00075 else:
00076 if fd in self.readers:
00077 self.asyncio_loop.remove_reader(fd)
00078 self.readers.remove(fd)
00079 if events & IOLoop.WRITE:
00080 if fd not in self.writers:
00081 self.asyncio_loop.add_writer(
00082 fd, self._handle_events, fd, IOLoop.WRITE)
00083 self.writers.add(fd)
00084 else:
00085 if fd in self.writers:
00086 self.asyncio_loop.remove_writer(fd)
00087 self.writers.remove(fd)
00088
00089 def remove_handler(self, fd):
00090 fd, fileobj = self.split_fd(fd)
00091 if fd not in self.handlers:
00092 return
00093 if fd in self.readers:
00094 self.asyncio_loop.remove_reader(fd)
00095 self.readers.remove(fd)
00096 if fd in self.writers:
00097 self.asyncio_loop.remove_writer(fd)
00098 self.writers.remove(fd)
00099 del self.handlers[fd]
00100
00101 def _handle_events(self, fd, events):
00102 fileobj, handler_func = self.handlers[fd]
00103 handler_func(fileobj, events)
00104
00105 def start(self):
00106 self._setup_logging()
00107 self.asyncio_loop.run_forever()
00108
00109 def stop(self):
00110 self.asyncio_loop.stop()
00111
00112 def call_at(self, when, callback, *args, **kwargs):
00113
00114
00115
00116 return self.asyncio_loop.call_later(
00117 max(0, when - self.time()), self._run_callback,
00118 functools.partial(stack_context.wrap(callback), *args, **kwargs))
00119
00120 def remove_timeout(self, timeout):
00121 timeout.cancel()
00122
00123 def add_callback(self, callback, *args, **kwargs):
00124 if self.closing:
00125 raise RuntimeError("IOLoop is closing")
00126 self.asyncio_loop.call_soon_threadsafe(
00127 self._run_callback,
00128 functools.partial(stack_context.wrap(callback), *args, **kwargs))
00129
00130 add_callback_from_signal = add_callback
00131
00132
00133 class AsyncIOMainLoop(BaseAsyncIOLoop):
00134 def initialize(self):
00135 super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(),
00136 close_loop=False)
00137
00138
00139 class AsyncIOLoop(BaseAsyncIOLoop):
00140 def initialize(self):
00141 super(AsyncIOLoop, self).initialize(asyncio.new_event_loop(),
00142 close_loop=True)