asyncio.py
Go to the documentation of this file.
00001 """Bridges between the `asyncio` module and Tornado IOLoop.
00002 
00003 This is a work in progress and interfaces are subject to change.
00004 
00005 To test:
00006 python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop
00007 python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOMainLoop
00008 (the tests log a few warnings with AsyncIOMainLoop because they leave some
00009 unfinished callbacks on the event loop that fail when it resumes)
00010 """
00011 
00012 from __future__ import absolute_import, division, print_function, with_statement
00013 import datetime
00014 import functools
00015 
00016 from tornado.ioloop import IOLoop
00017 from tornado import stack_context
00018 from tornado.util import timedelta_to_seconds
00019 
00020 try:
00021     # Import the real asyncio module for py33+ first.  Older versions of the
00022     # trollius backport also use this name.
00023     import asyncio
00024 except ImportError as e:
00025     # Asyncio itself isn't available; see if trollius is (backport to py26+).
00026     try:
00027         import trollius as asyncio
00028     except ImportError:
00029         # Re-raise the original asyncio error, not the trollius one.
00030         raise e
00031 
00032 class BaseAsyncIOLoop(IOLoop):
00033     def initialize(self, asyncio_loop, close_loop=False):
00034         self.asyncio_loop = asyncio_loop
00035         self.close_loop = close_loop
00036         self.asyncio_loop.call_soon(self.make_current)
00037         # Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
00038         self.handlers = {}
00039         # Set of fds listening for reads/writes
00040         self.readers = set()
00041         self.writers = set()
00042         self.closing = False
00043 
00044     def close(self, all_fds=False):
00045         self.closing = True
00046         for fd in list(self.handlers):
00047             fileobj, handler_func = self.handlers[fd]
00048             self.remove_handler(fd)
00049             if all_fds:
00050                 self.close_fd(fileobj)
00051         if self.close_loop:
00052             self.asyncio_loop.close()
00053 
00054     def add_handler(self, fd, handler, events):
00055         fd, fileobj = self.split_fd(fd)
00056         if fd in self.handlers:
00057             raise ValueError("fd %s added twice" % fd)
00058         self.handlers[fd] = (fileobj, stack_context.wrap(handler))
00059         if events & IOLoop.READ:
00060             self.asyncio_loop.add_reader(
00061                 fd, self._handle_events, fd, IOLoop.READ)
00062             self.readers.add(fd)
00063         if events & IOLoop.WRITE:
00064             self.asyncio_loop.add_writer(
00065                 fd, self._handle_events, fd, IOLoop.WRITE)
00066             self.writers.add(fd)
00067 
00068     def update_handler(self, fd, events):
00069         fd, fileobj = self.split_fd(fd)
00070         if events & IOLoop.READ:
00071             if fd not in self.readers:
00072                 self.asyncio_loop.add_reader(
00073                     fd, self._handle_events, fd, IOLoop.READ)
00074                 self.readers.add(fd)
00075         else:
00076             if fd in self.readers:
00077                 self.asyncio_loop.remove_reader(fd)
00078                 self.readers.remove(fd)
00079         if events & IOLoop.WRITE:
00080             if fd not in self.writers:
00081                 self.asyncio_loop.add_writer(
00082                     fd, self._handle_events, fd, IOLoop.WRITE)
00083                 self.writers.add(fd)
00084         else:
00085             if fd in self.writers:
00086                 self.asyncio_loop.remove_writer(fd)
00087                 self.writers.remove(fd)
00088 
00089     def remove_handler(self, fd):
00090         fd, fileobj = self.split_fd(fd)
00091         if fd not in self.handlers:
00092             return
00093         if fd in self.readers:
00094             self.asyncio_loop.remove_reader(fd)
00095             self.readers.remove(fd)
00096         if fd in self.writers:
00097             self.asyncio_loop.remove_writer(fd)
00098             self.writers.remove(fd)
00099         del self.handlers[fd]
00100 
00101     def _handle_events(self, fd, events):
00102         fileobj, handler_func = self.handlers[fd]
00103         handler_func(fileobj, events)
00104 
00105     def start(self):
00106         self._setup_logging()
00107         self.asyncio_loop.run_forever()
00108 
00109     def stop(self):
00110         self.asyncio_loop.stop()
00111 
00112     def call_at(self, when, callback, *args, **kwargs):
00113         # asyncio.call_at supports *args but not **kwargs, so bind them here.
00114         # We do not synchronize self.time and asyncio_loop.time, so
00115         # convert from absolute to relative.
00116         return self.asyncio_loop.call_later(
00117             max(0, when - self.time()), self._run_callback,
00118             functools.partial(stack_context.wrap(callback), *args, **kwargs))
00119 
00120     def remove_timeout(self, timeout):
00121         timeout.cancel()
00122 
00123     def add_callback(self, callback, *args, **kwargs):
00124         if self.closing:
00125             raise RuntimeError("IOLoop is closing")
00126         self.asyncio_loop.call_soon_threadsafe(
00127             self._run_callback,
00128             functools.partial(stack_context.wrap(callback), *args, **kwargs))
00129 
00130     add_callback_from_signal = add_callback
00131 
00132 
00133 class AsyncIOMainLoop(BaseAsyncIOLoop):
00134     def initialize(self):
00135         super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(),
00136                                                 close_loop=False)
00137 
00138 
00139 class AsyncIOLoop(BaseAsyncIOLoop):
00140     def initialize(self):
00141         super(AsyncIOLoop, self).initialize(asyncio.new_event_loop(),
00142                                             close_loop=True)


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:50