iostream_test.py
Go to the documentation of this file.
00001 from __future__ import absolute_import, division, with_statement
00002 from tornado import netutil
00003 from tornado.ioloop import IOLoop
00004 from tornado.iostream import IOStream
00005 from tornado.testing import AsyncHTTPTestCase, LogTrapTestCase, get_unused_port
00006 from tornado.util import b
00007 from tornado.web import RequestHandler, Application
00008 import errno
00009 import socket
00010 import sys
00011 import time
00012 
00013 
00014 class HelloHandler(RequestHandler):
00015     def get(self):
00016         self.write("Hello")
00017 
00018 
00019 class TestIOStream(AsyncHTTPTestCase, LogTrapTestCase):
00020     def get_app(self):
00021         return Application([('/', HelloHandler)])
00022 
00023     def make_iostream_pair(self, **kwargs):
00024         port = get_unused_port()
00025         [listener] = netutil.bind_sockets(port, '127.0.0.1',
00026                                           family=socket.AF_INET)
00027         streams = [None, None]
00028 
00029         def accept_callback(connection, address):
00030             streams[0] = IOStream(connection, io_loop=self.io_loop, **kwargs)
00031             self.stop()
00032 
00033         def connect_callback():
00034             streams[1] = client_stream
00035             self.stop()
00036         netutil.add_accept_handler(listener, accept_callback,
00037                                    io_loop=self.io_loop)
00038         client_stream = IOStream(socket.socket(), io_loop=self.io_loop,
00039                                  **kwargs)
00040         client_stream.connect(('127.0.0.1', port),
00041                               callback=connect_callback)
00042         self.wait(condition=lambda: all(streams))
00043         self.io_loop.remove_handler(listener.fileno())
00044         listener.close()
00045         return streams
00046 
00047     def test_read_zero_bytes(self):
00048         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00049         s.connect(("localhost", self.get_http_port()))
00050         self.stream = IOStream(s, io_loop=self.io_loop)
00051         self.stream.write(b("GET / HTTP/1.0\r\n\r\n"))
00052 
00053         # normal read
00054         self.stream.read_bytes(9, self.stop)
00055         data = self.wait()
00056         self.assertEqual(data, b("HTTP/1.0 "))
00057 
00058         # zero bytes
00059         self.stream.read_bytes(0, self.stop)
00060         data = self.wait()
00061         self.assertEqual(data, b(""))
00062 
00063         # another normal read
00064         self.stream.read_bytes(3, self.stop)
00065         data = self.wait()
00066         self.assertEqual(data, b("200"))
00067 
00068         s.close()
00069 
00070     def test_write_zero_bytes(self):
00071         # Attempting to write zero bytes should run the callback without
00072         # going into an infinite loop.
00073         server, client = self.make_iostream_pair()
00074         server.write(b(''), callback=self.stop)
00075         self.wait()
00076         # As a side effect, the stream is now listening for connection
00077         # close (if it wasn't already), but is not listening for writes
00078         self.assertEqual(server._state, IOLoop.READ | IOLoop.ERROR)
00079         server.close()
00080         client.close()
00081 
00082     def test_connection_refused(self):
00083         # When a connection is refused, the connect callback should not
00084         # be run.  (The kqueue IOLoop used to behave differently from the
00085         # epoll IOLoop in this respect)
00086         port = get_unused_port()
00087         stream = IOStream(socket.socket(), self.io_loop)
00088         self.connect_called = False
00089 
00090         def connect_callback():
00091             self.connect_called = True
00092         stream.set_close_callback(self.stop)
00093         stream.connect(("localhost", port), connect_callback)
00094         self.wait()
00095         self.assertFalse(self.connect_called)
00096         self.assertTrue(isinstance(stream.error, socket.error), stream.error)
00097         if sys.platform != 'cygwin':
00098             # cygwin's errnos don't match those used on native windows python
00099             self.assertEqual(stream.error.args[0], errno.ECONNREFUSED)
00100 
00101     def test_gaierror(self):
00102         # Test that IOStream sets its exc_info on getaddrinfo error
00103         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00104         stream = IOStream(s, io_loop=self.io_loop)
00105         stream.set_close_callback(self.stop)
00106         stream.connect(('adomainthatdoesntexist.asdf', 54321))
00107         self.assertTrue(isinstance(stream.error, socket.gaierror), stream.error)
00108 
00109     def test_connection_closed(self):
00110         # When a server sends a response and then closes the connection,
00111         # the client must be allowed to read the data before the IOStream
00112         # closes itself.  Epoll reports closed connections with a separate
00113         # EPOLLRDHUP event delivered at the same time as the read event,
00114         # while kqueue reports them as a second read/write event with an EOF
00115         # flag.
00116         response = self.fetch("/", headers={"Connection": "close"})
00117         response.rethrow()
00118 
00119     def test_read_until_close(self):
00120         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00121         s.connect(("localhost", self.get_http_port()))
00122         stream = IOStream(s, io_loop=self.io_loop)
00123         stream.write(b("GET / HTTP/1.0\r\n\r\n"))
00124 
00125         stream.read_until_close(self.stop)
00126         data = self.wait()
00127         self.assertTrue(data.startswith(b("HTTP/1.0 200")))
00128         self.assertTrue(data.endswith(b("Hello")))
00129 
00130     def test_streaming_callback(self):
00131         server, client = self.make_iostream_pair()
00132         try:
00133             chunks = []
00134             final_called = []
00135 
00136             def streaming_callback(data):
00137                 chunks.append(data)
00138                 self.stop()
00139 
00140             def final_callback(data):
00141                 assert not data
00142                 final_called.append(True)
00143                 self.stop()
00144             server.read_bytes(6, callback=final_callback,
00145                               streaming_callback=streaming_callback)
00146             client.write(b("1234"))
00147             self.wait(condition=lambda: chunks)
00148             client.write(b("5678"))
00149             self.wait(condition=lambda: final_called)
00150             self.assertEqual(chunks, [b("1234"), b("56")])
00151 
00152             # the rest of the last chunk is still in the buffer
00153             server.read_bytes(2, callback=self.stop)
00154             data = self.wait()
00155             self.assertEqual(data, b("78"))
00156         finally:
00157             server.close()
00158             client.close()
00159 
00160     def test_streaming_until_close(self):
00161         server, client = self.make_iostream_pair()
00162         try:
00163             chunks = []
00164 
00165             def callback(data):
00166                 chunks.append(data)
00167                 self.stop()
00168             client.read_until_close(callback=callback,
00169                                     streaming_callback=callback)
00170             server.write(b("1234"))
00171             self.wait()
00172             server.write(b("5678"))
00173             self.wait()
00174             server.close()
00175             self.wait()
00176             self.assertEqual(chunks, [b("1234"), b("5678"), b("")])
00177         finally:
00178             server.close()
00179             client.close()
00180 
00181     def test_delayed_close_callback(self):
00182         # The scenario:  Server closes the connection while there is a pending
00183         # read that can be served out of buffered data.  The client does not
00184         # run the close_callback as soon as it detects the close, but rather
00185         # defers it until after the buffered read has finished.
00186         server, client = self.make_iostream_pair()
00187         try:
00188             client.set_close_callback(self.stop)
00189             server.write(b("12"))
00190             chunks = []
00191 
00192             def callback1(data):
00193                 chunks.append(data)
00194                 client.read_bytes(1, callback2)
00195                 server.close()
00196 
00197             def callback2(data):
00198                 chunks.append(data)
00199             client.read_bytes(1, callback1)
00200             self.wait()  # stopped by close_callback
00201             self.assertEqual(chunks, [b("1"), b("2")])
00202         finally:
00203             server.close()
00204             client.close()
00205 
00206     def test_close_buffered_data(self):
00207         # Similar to the previous test, but with data stored in the OS's
00208         # socket buffers instead of the IOStream's read buffer.  Out-of-band
00209         # close notifications must be delayed until all data has been
00210         # drained into the IOStream buffer. (epoll used to use out-of-band
00211         # close events with EPOLLRDHUP, but no longer)
00212         #
00213         # This depends on the read_chunk_size being smaller than the
00214         # OS socket buffer, so make it small.
00215         server, client = self.make_iostream_pair(read_chunk_size=256)
00216         try:
00217             server.write(b("A") * 512)
00218             client.read_bytes(256, self.stop)
00219             data = self.wait()
00220             self.assertEqual(b("A") * 256, data)
00221             server.close()
00222             # Allow the close to propagate to the client side of the
00223             # connection.  Using add_callback instead of add_timeout
00224             # doesn't seem to work, even with multiple iterations
00225             self.io_loop.add_timeout(time.time() + 0.01, self.stop)
00226             self.wait()
00227             client.read_bytes(256, self.stop)
00228             data = self.wait()
00229             self.assertEqual(b("A") * 256, data)
00230         finally:
00231             server.close()
00232             client.close()
00233 
00234     def test_large_read_until(self):
00235         # Performance test: read_until used to have a quadratic component
00236         # so a read_until of 4MB would take 8 seconds; now it takes 0.25
00237         # seconds.
00238         server, client = self.make_iostream_pair()
00239         try:
00240             NUM_KB = 4096
00241             for i in xrange(NUM_KB):
00242                 client.write(b("A") * 1024)
00243             client.write(b("\r\n"))
00244             server.read_until(b("\r\n"), self.stop)
00245             data = self.wait()
00246             self.assertEqual(len(data), NUM_KB * 1024 + 2)
00247         finally:
00248             server.close()
00249             client.close()


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Mon Oct 6 2014 06:58:14