tcpclient_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2014 Facebook
00004 #
00005 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00006 # not use this file except in compliance with the License. You may obtain
00007 # a copy of the License at
00008 #
00009 #     http://www.apache.org/licenses/LICENSE-2.0
00010 #
00011 # Unless required by applicable law or agreed to in writing, software
00012 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00013 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00014 # License for the specific language governing permissions and limitations
00015 # under the License.
00016 
00017 from __future__ import absolute_import, division, print_function, with_statement
00018 
00019 from contextlib import closing
00020 import os
00021 import socket
00022 
00023 from tornado.concurrent import Future
00024 from tornado.netutil import bind_sockets, Resolver
00025 from tornado.tcpclient import TCPClient, _Connector
00026 from tornado.tcpserver import TCPServer
00027 from tornado.testing import AsyncTestCase, bind_unused_port, gen_test
00028 from tornado.test.util import skipIfNoIPv6, unittest
00029 
00030 # Fake address families for testing.  Used in place of AF_INET
00031 # and AF_INET6 because some installations do not have AF_INET6.
00032 AF1, AF2 = 1, 2
00033 
00034 
00035 class TestTCPServer(TCPServer):
00036     def __init__(self, family):
00037         super(TestTCPServer, self).__init__()
00038         self.streams = []
00039         sockets = bind_sockets(None, 'localhost', family)
00040         self.add_sockets(sockets)
00041         self.port = sockets[0].getsockname()[1]
00042 
00043     def handle_stream(self, stream, address):
00044         self.streams.append(stream)
00045 
00046     def stop(self):
00047         super(TestTCPServer, self).stop()
00048         for stream in self.streams:
00049             stream.close()
00050 
00051 
00052 class TCPClientTest(AsyncTestCase):
00053     def setUp(self):
00054         super(TCPClientTest, self).setUp()
00055         self.server = None
00056         self.client = TCPClient()
00057 
00058     def start_server(self, family):
00059         if family == socket.AF_UNSPEC and 'TRAVIS' in os.environ:
00060             self.skipTest("dual-stack servers often have port conflicts on travis")
00061         self.server = TestTCPServer(family)
00062         return self.server.port
00063 
00064     def stop_server(self):
00065         if self.server is not None:
00066             self.server.stop()
00067             self.server = None
00068 
00069     def tearDown(self):
00070         self.client.close()
00071         self.stop_server()
00072         super(TCPClientTest, self).tearDown()
00073 
00074     def skipIfLocalhostV4(self):
00075         Resolver().resolve('localhost', 0, callback=self.stop)
00076         addrinfo = self.wait()
00077         families = set(addr[0] for addr in addrinfo)
00078         if socket.AF_INET6 not in families:
00079             self.skipTest("localhost does not resolve to ipv6")
00080 
00081     @gen_test
00082     def do_test_connect(self, family, host):
00083         port = self.start_server(family)
00084         stream = yield self.client.connect(host, port)
00085         with closing(stream):
00086             stream.write(b"hello")
00087             data = yield self.server.streams[0].read_bytes(5)
00088             self.assertEqual(data, b"hello")
00089 
00090     def test_connect_ipv4_ipv4(self):
00091         self.do_test_connect(socket.AF_INET, '127.0.0.1')
00092 
00093     def test_connect_ipv4_dual(self):
00094         self.do_test_connect(socket.AF_INET, 'localhost')
00095 
00096     @skipIfNoIPv6
00097     def test_connect_ipv6_ipv6(self):
00098         self.skipIfLocalhostV4()
00099         self.do_test_connect(socket.AF_INET6, '::1')
00100 
00101     @skipIfNoIPv6
00102     def test_connect_ipv6_dual(self):
00103         self.skipIfLocalhostV4()
00104         if Resolver.configured_class().__name__.endswith('TwistedResolver'):
00105             self.skipTest('TwistedResolver does not support multiple addresses')
00106         self.do_test_connect(socket.AF_INET6, 'localhost')
00107 
00108     def test_connect_unspec_ipv4(self):
00109         self.do_test_connect(socket.AF_UNSPEC, '127.0.0.1')
00110 
00111     @skipIfNoIPv6
00112     def test_connect_unspec_ipv6(self):
00113         self.skipIfLocalhostV4()
00114         self.do_test_connect(socket.AF_UNSPEC, '::1')
00115 
00116     def test_connect_unspec_dual(self):
00117         self.do_test_connect(socket.AF_UNSPEC, 'localhost')
00118 
00119     @gen_test
00120     def test_refused_ipv4(self):
00121         sock, port = bind_unused_port()
00122         sock.close()
00123         with self.assertRaises(IOError):
00124             yield self.client.connect('127.0.0.1', port)
00125 
00126 
00127 class TestConnectorSplit(unittest.TestCase):
00128     def test_one_family(self):
00129         # These addresses aren't in the right format, but split doesn't care.
00130         primary, secondary = _Connector.split(
00131             [(AF1, 'a'),
00132              (AF1, 'b')])
00133         self.assertEqual(primary, [(AF1, 'a'),
00134                                    (AF1, 'b')])
00135         self.assertEqual(secondary, [])
00136 
00137     def test_mixed(self):
00138         primary, secondary = _Connector.split(
00139             [(AF1, 'a'),
00140              (AF2, 'b'),
00141              (AF1, 'c'),
00142              (AF2, 'd')])
00143         self.assertEqual(primary, [(AF1, 'a'), (AF1, 'c')])
00144         self.assertEqual(secondary, [(AF2, 'b'), (AF2, 'd')])
00145 
00146 
00147 class ConnectorTest(AsyncTestCase):
00148     class FakeStream(object):
00149         def __init__(self):
00150             self.closed = False
00151 
00152         def close(self):
00153             self.closed = True
00154 
00155     def setUp(self):
00156         super(ConnectorTest, self).setUp()
00157         self.connect_futures = {}
00158         self.streams = {}
00159         self.addrinfo = [(AF1, 'a'), (AF1, 'b'),
00160                          (AF2, 'c'), (AF2, 'd')]
00161 
00162     def tearDown(self):
00163         # Unless explicitly checked (and popped) in the test, we shouldn't
00164         # be closing any streams
00165         for stream in self.streams.values():
00166             self.assertFalse(stream.closed)
00167         super(ConnectorTest, self).tearDown()
00168 
00169     def create_stream(self, af, addr):
00170         future = Future()
00171         self.connect_futures[(af, addr)] = future
00172         return future
00173 
00174     def assert_pending(self, *keys):
00175         self.assertEqual(sorted(self.connect_futures.keys()), sorted(keys))
00176 
00177     def resolve_connect(self, af, addr, success):
00178         future = self.connect_futures.pop((af, addr))
00179         if success:
00180             self.streams[addr] = ConnectorTest.FakeStream()
00181             future.set_result(self.streams[addr])
00182         else:
00183             future.set_exception(IOError())
00184 
00185     def start_connect(self, addrinfo):
00186         conn = _Connector(addrinfo, self.io_loop, self.create_stream)
00187         # Give it a huge timeout; we'll trigger timeouts manually.
00188         future = conn.start(3600)
00189         return conn, future
00190 
00191     def test_immediate_success(self):
00192         conn, future = self.start_connect(self.addrinfo)
00193         self.assertEqual(list(self.connect_futures.keys()),
00194                          [(AF1, 'a')])
00195         self.resolve_connect(AF1, 'a', True)
00196         self.assertEqual(future.result(), (AF1, 'a', self.streams['a']))
00197 
00198     def test_immediate_failure(self):
00199         # Fail with just one address.
00200         conn, future = self.start_connect([(AF1, 'a')])
00201         self.assert_pending((AF1, 'a'))
00202         self.resolve_connect(AF1, 'a', False)
00203         self.assertRaises(IOError, future.result)
00204 
00205     def test_one_family_second_try(self):
00206         conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')])
00207         self.assert_pending((AF1, 'a'))
00208         self.resolve_connect(AF1, 'a', False)
00209         self.assert_pending((AF1, 'b'))
00210         self.resolve_connect(AF1, 'b', True)
00211         self.assertEqual(future.result(), (AF1, 'b', self.streams['b']))
00212 
00213     def test_one_family_second_try_failure(self):
00214         conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')])
00215         self.assert_pending((AF1, 'a'))
00216         self.resolve_connect(AF1, 'a', False)
00217         self.assert_pending((AF1, 'b'))
00218         self.resolve_connect(AF1, 'b', False)
00219         self.assertRaises(IOError, future.result)
00220 
00221     def test_one_family_second_try_timeout(self):
00222         conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')])
00223         self.assert_pending((AF1, 'a'))
00224         # trigger the timeout while the first lookup is pending;
00225         # nothing happens.
00226         conn.on_timeout()
00227         self.assert_pending((AF1, 'a'))
00228         self.resolve_connect(AF1, 'a', False)
00229         self.assert_pending((AF1, 'b'))
00230         self.resolve_connect(AF1, 'b', True)
00231         self.assertEqual(future.result(), (AF1, 'b', self.streams['b']))
00232 
00233     def test_two_families_immediate_failure(self):
00234         conn, future = self.start_connect(self.addrinfo)
00235         self.assert_pending((AF1, 'a'))
00236         self.resolve_connect(AF1, 'a', False)
00237         self.assert_pending((AF1, 'b'), (AF2, 'c'))
00238         self.resolve_connect(AF1, 'b', False)
00239         self.resolve_connect(AF2, 'c', True)
00240         self.assertEqual(future.result(), (AF2, 'c', self.streams['c']))
00241 
00242     def test_two_families_timeout(self):
00243         conn, future = self.start_connect(self.addrinfo)
00244         self.assert_pending((AF1, 'a'))
00245         conn.on_timeout()
00246         self.assert_pending((AF1, 'a'), (AF2, 'c'))
00247         self.resolve_connect(AF2, 'c', True)
00248         self.assertEqual(future.result(), (AF2, 'c', self.streams['c']))
00249         # resolving 'a' after the connection has completed doesn't start 'b'
00250         self.resolve_connect(AF1, 'a', False)
00251         self.assert_pending()
00252 
00253     def test_success_after_timeout(self):
00254         conn, future = self.start_connect(self.addrinfo)
00255         self.assert_pending((AF1, 'a'))
00256         conn.on_timeout()
00257         self.assert_pending((AF1, 'a'), (AF2, 'c'))
00258         self.resolve_connect(AF1, 'a', True)
00259         self.assertEqual(future.result(), (AF1, 'a', self.streams['a']))
00260         # resolving 'c' after completion closes the connection.
00261         self.resolve_connect(AF2, 'c', True)
00262         self.assertTrue(self.streams.pop('c').closed)
00263 
00264     def test_all_fail(self):
00265         conn, future = self.start_connect(self.addrinfo)
00266         self.assert_pending((AF1, 'a'))
00267         conn.on_timeout()
00268         self.assert_pending((AF1, 'a'), (AF2, 'c'))
00269         self.resolve_connect(AF2, 'c', False)
00270         self.assert_pending((AF1, 'a'), (AF2, 'd'))
00271         self.resolve_connect(AF2, 'd', False)
00272         # one queue is now empty
00273         self.assert_pending((AF1, 'a'))
00274         self.resolve_connect(AF1, 'a', False)
00275         self.assert_pending((AF1, 'b'))
00276         self.assertFalse(future.done())
00277         self.resolve_connect(AF1, 'b', False)
00278         self.assertRaises(IOError, future.result)


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:50