00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 """A non-blocking TCP connection factory.
00018 """
00019 from __future__ import absolute_import, division, print_function, with_statement
00020
00021 import functools
00022 import socket
00023
00024 from tornado.concurrent import Future
00025 from tornado.ioloop import IOLoop
00026 from tornado.iostream import IOStream
00027 from tornado import gen
00028 from tornado.netutil import Resolver
00029
00030 _INITIAL_CONNECT_TIMEOUT = 0.3
00031
00032
00033 class _Connector(object):
00034 """A stateless implementation of the "Happy Eyeballs" algorithm.
00035
00036 "Happy Eyeballs" is documented in RFC6555 as the recommended practice
00037 for when both IPv4 and IPv6 addresses are available.
00038
00039 In this implementation, we partition the addresses by family, and
00040 make the first connection attempt to whichever address was
00041 returned first by ``getaddrinfo``. If that connection fails or
00042 times out, we begin a connection in parallel to the first address
00043 of the other family. If there are additional failures we retry
00044 with other addresses, keeping one connection attempt per family
00045 in flight at a time.
00046
00047 http://tools.ietf.org/html/rfc6555
00048
00049 """
00050 def __init__(self, addrinfo, io_loop, connect):
00051 self.io_loop = io_loop
00052 self.connect = connect
00053
00054 self.future = Future()
00055 self.timeout = None
00056 self.last_error = None
00057 self.remaining = len(addrinfo)
00058 self.primary_addrs, self.secondary_addrs = self.split(addrinfo)
00059
00060 @staticmethod
00061 def split(addrinfo):
00062 """Partition the ``addrinfo`` list by address family.
00063
00064 Returns two lists. The first list contains the first entry from
00065 ``addrinfo`` and all others with the same family, and the
00066 second list contains all other addresses (normally one list will
00067 be AF_INET and the other AF_INET6, although non-standard resolvers
00068 may return additional families).
00069 """
00070 primary = []
00071 secondary = []
00072 primary_af = addrinfo[0][0]
00073 for af, addr in addrinfo:
00074 if af == primary_af:
00075 primary.append((af, addr))
00076 else:
00077 secondary.append((af, addr))
00078 return primary, secondary
00079
00080 def start(self, timeout=_INITIAL_CONNECT_TIMEOUT):
00081 self.try_connect(iter(self.primary_addrs))
00082 self.set_timout(timeout)
00083 return self.future
00084
00085 def try_connect(self, addrs):
00086 try:
00087 af, addr = next(addrs)
00088 except StopIteration:
00089
00090
00091
00092 if self.remaining == 0 and not self.future.done():
00093 self.future.set_exception(self.last_error or
00094 IOError("connection failed"))
00095 return
00096 future = self.connect(af, addr)
00097 future.add_done_callback(functools.partial(self.on_connect_done,
00098 addrs, af, addr))
00099
00100 def on_connect_done(self, addrs, af, addr, future):
00101 self.remaining -= 1
00102 try:
00103 stream = future.result()
00104 except Exception as e:
00105 if self.future.done():
00106 return
00107
00108
00109 self.last_error = e
00110 self.try_connect(addrs)
00111 if self.timeout is not None:
00112
00113
00114 self.on_timeout()
00115 return
00116 self.clear_timeout()
00117 if self.future.done():
00118
00119 stream.close()
00120 else:
00121 self.future.set_result((af, addr, stream))
00122
00123 def set_timout(self, timeout):
00124 self.timeout = self.io_loop.add_timeout(self.io_loop.time() + timeout,
00125 self.on_timeout)
00126
00127 def on_timeout(self):
00128 self.timeout = None
00129 self.try_connect(iter(self.secondary_addrs))
00130
00131 def clear_timeout(self):
00132 if self.timeout is not None:
00133 self.io_loop.remove_timeout(self.timeout)
00134
00135
00136 class TCPClient(object):
00137 """A non-blocking TCP connection factory.
00138 """
00139 def __init__(self, resolver=None, io_loop=None):
00140 self.io_loop = io_loop or IOLoop.current()
00141 if resolver is not None:
00142 self.resolver = resolver
00143 self._own_resolver = False
00144 else:
00145 self.resolver = Resolver(io_loop=io_loop)
00146 self._own_resolver = True
00147
00148 def close(self):
00149 if self._own_resolver:
00150 self.resolver.close()
00151
00152 @gen.coroutine
00153 def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
00154 max_buffer_size=None):
00155 """Connect to the given host and port.
00156
00157 Asynchronously returns an `.IOStream` (or `.SSLIOStream` if
00158 ``ssl_options`` is not None).
00159 """
00160 addrinfo = yield self.resolver.resolve(host, port, af)
00161 connector = _Connector(
00162 addrinfo, self.io_loop,
00163 functools.partial(self._create_stream, max_buffer_size))
00164 af, addr, stream = yield connector.start()
00165
00166
00167
00168 if ssl_options is not None:
00169 stream = yield stream.start_tls(False, ssl_options=ssl_options,
00170 server_hostname=host)
00171 raise gen.Return(stream)
00172
00173 def _create_stream(self, max_buffer_size, af, addr):
00174
00175
00176 stream = IOStream(socket.socket(af),
00177 io_loop=self.io_loop,
00178 max_buffer_size=max_buffer_size)
00179 return stream.connect(addr)