tcpclient.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2014 Facebook
00004 #
00005 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00006 # not use this file except in compliance with the License. You may obtain
00007 # a copy of the License at
00008 #
00009 #     http://www.apache.org/licenses/LICENSE-2.0
00010 #
00011 # Unless required by applicable law or agreed to in writing, software
00012 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00013 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00014 # License for the specific language governing permissions and limitations
00015 # under the License.
00016 
00017 """A non-blocking TCP connection factory.
00018 """
00019 from __future__ import absolute_import, division, print_function, with_statement
00020 
00021 import functools
00022 import socket
00023 
00024 from tornado.concurrent import Future
00025 from tornado.ioloop import IOLoop
00026 from tornado.iostream import IOStream
00027 from tornado import gen
00028 from tornado.netutil import Resolver
00029 
00030 _INITIAL_CONNECT_TIMEOUT = 0.3
00031 
00032 
00033 class _Connector(object):
00034     """A stateless implementation of the "Happy Eyeballs" algorithm.
00035 
00036     "Happy Eyeballs" is documented in RFC6555 as the recommended practice
00037     for when both IPv4 and IPv6 addresses are available.
00038 
00039     In this implementation, we partition the addresses by family, and
00040     make the first connection attempt to whichever address was
00041     returned first by ``getaddrinfo``.  If that connection fails or
00042     times out, we begin a connection in parallel to the first address
00043     of the other family.  If there are additional failures we retry
00044     with other addresses, keeping one connection attempt per family
00045     in flight at a time.
00046 
00047     http://tools.ietf.org/html/rfc6555
00048 
00049     """
00050     def __init__(self, addrinfo, io_loop, connect):
00051         self.io_loop = io_loop
00052         self.connect = connect
00053 
00054         self.future = Future()
00055         self.timeout = None
00056         self.last_error = None
00057         self.remaining = len(addrinfo)
00058         self.primary_addrs, self.secondary_addrs = self.split(addrinfo)
00059 
00060     @staticmethod
00061     def split(addrinfo):
00062         """Partition the ``addrinfo`` list by address family.
00063 
00064         Returns two lists.  The first list contains the first entry from
00065         ``addrinfo`` and all others with the same family, and the
00066         second list contains all other addresses (normally one list will
00067         be AF_INET and the other AF_INET6, although non-standard resolvers
00068         may return additional families).
00069         """
00070         primary = []
00071         secondary = []
00072         primary_af = addrinfo[0][0]
00073         for af, addr in addrinfo:
00074             if af == primary_af:
00075                 primary.append((af, addr))
00076             else:
00077                 secondary.append((af, addr))
00078         return primary, secondary
00079 
00080     def start(self, timeout=_INITIAL_CONNECT_TIMEOUT):
00081         self.try_connect(iter(self.primary_addrs))
00082         self.set_timout(timeout)
00083         return self.future
00084 
00085     def try_connect(self, addrs):
00086         try:
00087             af, addr = next(addrs)
00088         except StopIteration:
00089             # We've reached the end of our queue, but the other queue
00090             # might still be working.  Send a final error on the future
00091             # only when both queues are finished.
00092             if self.remaining == 0 and not self.future.done():
00093                 self.future.set_exception(self.last_error or
00094                                           IOError("connection failed"))
00095             return
00096         future = self.connect(af, addr)
00097         future.add_done_callback(functools.partial(self.on_connect_done,
00098                                                    addrs, af, addr))
00099 
00100     def on_connect_done(self, addrs, af, addr, future):
00101         self.remaining -= 1
00102         try:
00103             stream = future.result()
00104         except Exception as e:
00105             if self.future.done():
00106                 return
00107             # Error: try again (but remember what happened so we have an
00108             # error to raise in the end)
00109             self.last_error = e
00110             self.try_connect(addrs)
00111             if self.timeout is not None:
00112                 # If the first attempt failed, don't wait for the
00113                 # timeout to try an address from the secondary queue.
00114                 self.on_timeout()
00115             return
00116         self.clear_timeout()
00117         if self.future.done():
00118             # This is a late arrival; just drop it.
00119             stream.close()
00120         else:
00121             self.future.set_result((af, addr, stream))
00122 
00123     def set_timout(self, timeout):
00124         self.timeout = self.io_loop.add_timeout(self.io_loop.time() + timeout,
00125                                                 self.on_timeout)
00126 
00127     def on_timeout(self):
00128         self.timeout = None
00129         self.try_connect(iter(self.secondary_addrs))
00130 
00131     def clear_timeout(self):
00132         if self.timeout is not None:
00133             self.io_loop.remove_timeout(self.timeout)
00134 
00135 
00136 class TCPClient(object):
00137     """A non-blocking TCP connection factory.
00138     """
00139     def __init__(self, resolver=None, io_loop=None):
00140         self.io_loop = io_loop or IOLoop.current()
00141         if resolver is not None:
00142             self.resolver = resolver
00143             self._own_resolver = False
00144         else:
00145             self.resolver = Resolver(io_loop=io_loop)
00146             self._own_resolver = True
00147 
00148     def close(self):
00149         if self._own_resolver:
00150             self.resolver.close()
00151 
00152     @gen.coroutine
00153     def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
00154                 max_buffer_size=None):
00155         """Connect to the given host and port.
00156 
00157         Asynchronously returns an `.IOStream` (or `.SSLIOStream` if
00158         ``ssl_options`` is not None).
00159         """
00160         addrinfo = yield self.resolver.resolve(host, port, af)
00161         connector = _Connector(
00162             addrinfo, self.io_loop,
00163             functools.partial(self._create_stream, max_buffer_size))
00164         af, addr, stream = yield connector.start()
00165         # TODO: For better performance we could cache the (af, addr)
00166         # information here and re-use it on sbusequent connections to
00167         # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)
00168         if ssl_options is not None:
00169             stream = yield stream.start_tls(False, ssl_options=ssl_options,
00170                                             server_hostname=host)
00171         raise gen.Return(stream)
00172 
00173     def _create_stream(self, max_buffer_size, af, addr):
00174         # Always connect in plaintext; we'll convert to ssl if necessary
00175         # after one connection has completed.
00176         stream = IOStream(socket.socket(af),
00177                           io_loop=self.io_loop,
00178                           max_buffer_size=max_buffer_size)
00179         return stream.connect(addr)


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Thu Aug 27 2015 14:50:40