00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 from __future__ import absolute_import, division, print_function, with_statement
00018
00019 from contextlib import closing
00020 import os
00021 import socket
00022
00023 from tornado.concurrent import Future
00024 from tornado.netutil import bind_sockets, Resolver
00025 from tornado.tcpclient import TCPClient, _Connector
00026 from tornado.tcpserver import TCPServer
00027 from tornado.testing import AsyncTestCase, bind_unused_port, gen_test
00028 from tornado.test.util import skipIfNoIPv6, unittest
00029
00030
00031
00032 AF1, AF2 = 1, 2
00033
00034
00035 class TestTCPServer(TCPServer):
00036 def __init__(self, family):
00037 super(TestTCPServer, self).__init__()
00038 self.streams = []
00039 sockets = bind_sockets(None, 'localhost', family)
00040 self.add_sockets(sockets)
00041 self.port = sockets[0].getsockname()[1]
00042
00043 def handle_stream(self, stream, address):
00044 self.streams.append(stream)
00045
00046 def stop(self):
00047 super(TestTCPServer, self).stop()
00048 for stream in self.streams:
00049 stream.close()
00050
00051
00052 class TCPClientTest(AsyncTestCase):
00053 def setUp(self):
00054 super(TCPClientTest, self).setUp()
00055 self.server = None
00056 self.client = TCPClient()
00057
00058 def start_server(self, family):
00059 if family == socket.AF_UNSPEC and 'TRAVIS' in os.environ:
00060 self.skipTest("dual-stack servers often have port conflicts on travis")
00061 self.server = TestTCPServer(family)
00062 return self.server.port
00063
00064 def stop_server(self):
00065 if self.server is not None:
00066 self.server.stop()
00067 self.server = None
00068
00069 def tearDown(self):
00070 self.client.close()
00071 self.stop_server()
00072 super(TCPClientTest, self).tearDown()
00073
00074 def skipIfLocalhostV4(self):
00075 Resolver().resolve('localhost', 0, callback=self.stop)
00076 addrinfo = self.wait()
00077 families = set(addr[0] for addr in addrinfo)
00078 if socket.AF_INET6 not in families:
00079 self.skipTest("localhost does not resolve to ipv6")
00080
00081 @gen_test
00082 def do_test_connect(self, family, host):
00083 port = self.start_server(family)
00084 stream = yield self.client.connect(host, port)
00085 with closing(stream):
00086 stream.write(b"hello")
00087 data = yield self.server.streams[0].read_bytes(5)
00088 self.assertEqual(data, b"hello")
00089
00090 def test_connect_ipv4_ipv4(self):
00091 self.do_test_connect(socket.AF_INET, '127.0.0.1')
00092
00093 def test_connect_ipv4_dual(self):
00094 self.do_test_connect(socket.AF_INET, 'localhost')
00095
00096 @skipIfNoIPv6
00097 def test_connect_ipv6_ipv6(self):
00098 self.skipIfLocalhostV4()
00099 self.do_test_connect(socket.AF_INET6, '::1')
00100
00101 @skipIfNoIPv6
00102 def test_connect_ipv6_dual(self):
00103 self.skipIfLocalhostV4()
00104 if Resolver.configured_class().__name__.endswith('TwistedResolver'):
00105 self.skipTest('TwistedResolver does not support multiple addresses')
00106 self.do_test_connect(socket.AF_INET6, 'localhost')
00107
00108 def test_connect_unspec_ipv4(self):
00109 self.do_test_connect(socket.AF_UNSPEC, '127.0.0.1')
00110
00111 @skipIfNoIPv6
00112 def test_connect_unspec_ipv6(self):
00113 self.skipIfLocalhostV4()
00114 self.do_test_connect(socket.AF_UNSPEC, '::1')
00115
00116 def test_connect_unspec_dual(self):
00117 self.do_test_connect(socket.AF_UNSPEC, 'localhost')
00118
00119 @gen_test
00120 def test_refused_ipv4(self):
00121 sock, port = bind_unused_port()
00122 sock.close()
00123 with self.assertRaises(IOError):
00124 yield self.client.connect('127.0.0.1', port)
00125
00126
00127 class TestConnectorSplit(unittest.TestCase):
00128 def test_one_family(self):
00129
00130 primary, secondary = _Connector.split(
00131 [(AF1, 'a'),
00132 (AF1, 'b')])
00133 self.assertEqual(primary, [(AF1, 'a'),
00134 (AF1, 'b')])
00135 self.assertEqual(secondary, [])
00136
00137 def test_mixed(self):
00138 primary, secondary = _Connector.split(
00139 [(AF1, 'a'),
00140 (AF2, 'b'),
00141 (AF1, 'c'),
00142 (AF2, 'd')])
00143 self.assertEqual(primary, [(AF1, 'a'), (AF1, 'c')])
00144 self.assertEqual(secondary, [(AF2, 'b'), (AF2, 'd')])
00145
00146
00147 class ConnectorTest(AsyncTestCase):
00148 class FakeStream(object):
00149 def __init__(self):
00150 self.closed = False
00151
00152 def close(self):
00153 self.closed = True
00154
00155 def setUp(self):
00156 super(ConnectorTest, self).setUp()
00157 self.connect_futures = {}
00158 self.streams = {}
00159 self.addrinfo = [(AF1, 'a'), (AF1, 'b'),
00160 (AF2, 'c'), (AF2, 'd')]
00161
00162 def tearDown(self):
00163
00164
00165 for stream in self.streams.values():
00166 self.assertFalse(stream.closed)
00167 super(ConnectorTest, self).tearDown()
00168
00169 def create_stream(self, af, addr):
00170 future = Future()
00171 self.connect_futures[(af, addr)] = future
00172 return future
00173
00174 def assert_pending(self, *keys):
00175 self.assertEqual(sorted(self.connect_futures.keys()), sorted(keys))
00176
00177 def resolve_connect(self, af, addr, success):
00178 future = self.connect_futures.pop((af, addr))
00179 if success:
00180 self.streams[addr] = ConnectorTest.FakeStream()
00181 future.set_result(self.streams[addr])
00182 else:
00183 future.set_exception(IOError())
00184
00185 def start_connect(self, addrinfo):
00186 conn = _Connector(addrinfo, self.io_loop, self.create_stream)
00187
00188 future = conn.start(3600)
00189 return conn, future
00190
00191 def test_immediate_success(self):
00192 conn, future = self.start_connect(self.addrinfo)
00193 self.assertEqual(list(self.connect_futures.keys()),
00194 [(AF1, 'a')])
00195 self.resolve_connect(AF1, 'a', True)
00196 self.assertEqual(future.result(), (AF1, 'a', self.streams['a']))
00197
00198 def test_immediate_failure(self):
00199
00200 conn, future = self.start_connect([(AF1, 'a')])
00201 self.assert_pending((AF1, 'a'))
00202 self.resolve_connect(AF1, 'a', False)
00203 self.assertRaises(IOError, future.result)
00204
00205 def test_one_family_second_try(self):
00206 conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')])
00207 self.assert_pending((AF1, 'a'))
00208 self.resolve_connect(AF1, 'a', False)
00209 self.assert_pending((AF1, 'b'))
00210 self.resolve_connect(AF1, 'b', True)
00211 self.assertEqual(future.result(), (AF1, 'b', self.streams['b']))
00212
00213 def test_one_family_second_try_failure(self):
00214 conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')])
00215 self.assert_pending((AF1, 'a'))
00216 self.resolve_connect(AF1, 'a', False)
00217 self.assert_pending((AF1, 'b'))
00218 self.resolve_connect(AF1, 'b', False)
00219 self.assertRaises(IOError, future.result)
00220
00221 def test_one_family_second_try_timeout(self):
00222 conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')])
00223 self.assert_pending((AF1, 'a'))
00224
00225
00226 conn.on_timeout()
00227 self.assert_pending((AF1, 'a'))
00228 self.resolve_connect(AF1, 'a', False)
00229 self.assert_pending((AF1, 'b'))
00230 self.resolve_connect(AF1, 'b', True)
00231 self.assertEqual(future.result(), (AF1, 'b', self.streams['b']))
00232
00233 def test_two_families_immediate_failure(self):
00234 conn, future = self.start_connect(self.addrinfo)
00235 self.assert_pending((AF1, 'a'))
00236 self.resolve_connect(AF1, 'a', False)
00237 self.assert_pending((AF1, 'b'), (AF2, 'c'))
00238 self.resolve_connect(AF1, 'b', False)
00239 self.resolve_connect(AF2, 'c', True)
00240 self.assertEqual(future.result(), (AF2, 'c', self.streams['c']))
00241
00242 def test_two_families_timeout(self):
00243 conn, future = self.start_connect(self.addrinfo)
00244 self.assert_pending((AF1, 'a'))
00245 conn.on_timeout()
00246 self.assert_pending((AF1, 'a'), (AF2, 'c'))
00247 self.resolve_connect(AF2, 'c', True)
00248 self.assertEqual(future.result(), (AF2, 'c', self.streams['c']))
00249
00250 self.resolve_connect(AF1, 'a', False)
00251 self.assert_pending()
00252
00253 def test_success_after_timeout(self):
00254 conn, future = self.start_connect(self.addrinfo)
00255 self.assert_pending((AF1, 'a'))
00256 conn.on_timeout()
00257 self.assert_pending((AF1, 'a'), (AF2, 'c'))
00258 self.resolve_connect(AF1, 'a', True)
00259 self.assertEqual(future.result(), (AF1, 'a', self.streams['a']))
00260
00261 self.resolve_connect(AF2, 'c', True)
00262 self.assertTrue(self.streams.pop('c').closed)
00263
00264 def test_all_fail(self):
00265 conn, future = self.start_connect(self.addrinfo)
00266 self.assert_pending((AF1, 'a'))
00267 conn.on_timeout()
00268 self.assert_pending((AF1, 'a'), (AF2, 'c'))
00269 self.resolve_connect(AF2, 'c', False)
00270 self.assert_pending((AF1, 'a'), (AF2, 'd'))
00271 self.resolve_connect(AF2, 'd', False)
00272
00273 self.assert_pending((AF1, 'a'))
00274 self.resolve_connect(AF1, 'a', False)
00275 self.assert_pending((AF1, 'b'))
00276 self.assertFalse(future.done())
00277 self.resolve_connect(AF1, 'b', False)
00278 self.assertRaises(IOError, future.result)