00001 from __future__ import absolute_import, division, with_statement
00002 from tornado import netutil
00003 from tornado.ioloop import IOLoop
00004 from tornado.iostream import IOStream
00005 from tornado.testing import AsyncHTTPTestCase, LogTrapTestCase, get_unused_port
00006 from tornado.util import b
00007 from tornado.web import RequestHandler, Application
00008 import errno
00009 import socket
00010 import sys
00011 import time
00012 
00013 
00014 class HelloHandler(RequestHandler):
00015     def get(self):
00016         self.write("Hello")
00017 
00018 
00019 class TestIOStream(AsyncHTTPTestCase, LogTrapTestCase):
00020     def get_app(self):
00021         return Application([('/', HelloHandler)])
00022 
00023     def make_iostream_pair(self, **kwargs):
00024         port = get_unused_port()
00025         [listener] = netutil.bind_sockets(port, '127.0.0.1',
00026                                           family=socket.AF_INET)
00027         streams = [None, None]
00028 
00029         def accept_callback(connection, address):
00030             streams[0] = IOStream(connection, io_loop=self.io_loop, **kwargs)
00031             self.stop()
00032 
00033         def connect_callback():
00034             streams[1] = client_stream
00035             self.stop()
00036         netutil.add_accept_handler(listener, accept_callback,
00037                                    io_loop=self.io_loop)
00038         client_stream = IOStream(socket.socket(), io_loop=self.io_loop,
00039                                  **kwargs)
00040         client_stream.connect(('127.0.0.1', port),
00041                               callback=connect_callback)
00042         self.wait(condition=lambda: all(streams))
00043         self.io_loop.remove_handler(listener.fileno())
00044         listener.close()
00045         return streams
00046 
00047     def test_read_zero_bytes(self):
00048         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00049         s.connect(("localhost", self.get_http_port()))
00050         self.stream = IOStream(s, io_loop=self.io_loop)
00051         self.stream.write(b("GET / HTTP/1.0\r\n\r\n"))
00052 
00053         
00054         self.stream.read_bytes(9, self.stop)
00055         data = self.wait()
00056         self.assertEqual(data, b("HTTP/1.0 "))
00057 
00058         
00059         self.stream.read_bytes(0, self.stop)
00060         data = self.wait()
00061         self.assertEqual(data, b(""))
00062 
00063         
00064         self.stream.read_bytes(3, self.stop)
00065         data = self.wait()
00066         self.assertEqual(data, b("200"))
00067 
00068         s.close()
00069 
00070     def test_write_zero_bytes(self):
00071         
00072         
00073         server, client = self.make_iostream_pair()
00074         server.write(b(''), callback=self.stop)
00075         self.wait()
00076         
00077         
00078         self.assertEqual(server._state, IOLoop.READ | IOLoop.ERROR)
00079         server.close()
00080         client.close()
00081 
00082     def test_connection_refused(self):
00083         
00084         
00085         
00086         port = get_unused_port()
00087         stream = IOStream(socket.socket(), self.io_loop)
00088         self.connect_called = False
00089 
00090         def connect_callback():
00091             self.connect_called = True
00092         stream.set_close_callback(self.stop)
00093         stream.connect(("localhost", port), connect_callback)
00094         self.wait()
00095         self.assertFalse(self.connect_called)
00096         self.assertTrue(isinstance(stream.error, socket.error), stream.error)
00097         if sys.platform != 'cygwin':
00098             
00099             self.assertEqual(stream.error.args[0], errno.ECONNREFUSED)
00100 
00101     def test_gaierror(self):
00102         
00103         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00104         stream = IOStream(s, io_loop=self.io_loop)
00105         stream.set_close_callback(self.stop)
00106         stream.connect(('adomainthatdoesntexist.asdf', 54321))
00107         self.assertTrue(isinstance(stream.error, socket.gaierror), stream.error)
00108 
00109     def test_connection_closed(self):
00110         
00111         
00112         
00113         
00114         
00115         
00116         response = self.fetch("/", headers={"Connection": "close"})
00117         response.rethrow()
00118 
00119     def test_read_until_close(self):
00120         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00121         s.connect(("localhost", self.get_http_port()))
00122         stream = IOStream(s, io_loop=self.io_loop)
00123         stream.write(b("GET / HTTP/1.0\r\n\r\n"))
00124 
00125         stream.read_until_close(self.stop)
00126         data = self.wait()
00127         self.assertTrue(data.startswith(b("HTTP/1.0 200")))
00128         self.assertTrue(data.endswith(b("Hello")))
00129 
00130     def test_streaming_callback(self):
00131         server, client = self.make_iostream_pair()
00132         try:
00133             chunks = []
00134             final_called = []
00135 
00136             def streaming_callback(data):
00137                 chunks.append(data)
00138                 self.stop()
00139 
00140             def final_callback(data):
00141                 assert not data
00142                 final_called.append(True)
00143                 self.stop()
00144             server.read_bytes(6, callback=final_callback,
00145                               streaming_callback=streaming_callback)
00146             client.write(b("1234"))
00147             self.wait(condition=lambda: chunks)
00148             client.write(b("5678"))
00149             self.wait(condition=lambda: final_called)
00150             self.assertEqual(chunks, [b("1234"), b("56")])
00151 
00152             
00153             server.read_bytes(2, callback=self.stop)
00154             data = self.wait()
00155             self.assertEqual(data, b("78"))
00156         finally:
00157             server.close()
00158             client.close()
00159 
00160     def test_streaming_until_close(self):
00161         server, client = self.make_iostream_pair()
00162         try:
00163             chunks = []
00164 
00165             def callback(data):
00166                 chunks.append(data)
00167                 self.stop()
00168             client.read_until_close(callback=callback,
00169                                     streaming_callback=callback)
00170             server.write(b("1234"))
00171             self.wait()
00172             server.write(b("5678"))
00173             self.wait()
00174             server.close()
00175             self.wait()
00176             self.assertEqual(chunks, [b("1234"), b("5678"), b("")])
00177         finally:
00178             server.close()
00179             client.close()
00180 
00181     def test_delayed_close_callback(self):
00182         
00183         
00184         
00185         
00186         server, client = self.make_iostream_pair()
00187         try:
00188             client.set_close_callback(self.stop)
00189             server.write(b("12"))
00190             chunks = []
00191 
00192             def callback1(data):
00193                 chunks.append(data)
00194                 client.read_bytes(1, callback2)
00195                 server.close()
00196 
00197             def callback2(data):
00198                 chunks.append(data)
00199             client.read_bytes(1, callback1)
00200             self.wait()  
00201             self.assertEqual(chunks, [b("1"), b("2")])
00202         finally:
00203             server.close()
00204             client.close()
00205 
00206     def test_close_buffered_data(self):
00207         
00208         
00209         
00210         
00211         
00212         
00213         
00214         
00215         server, client = self.make_iostream_pair(read_chunk_size=256)
00216         try:
00217             server.write(b("A") * 512)
00218             client.read_bytes(256, self.stop)
00219             data = self.wait()
00220             self.assertEqual(b("A") * 256, data)
00221             server.close()
00222             
00223             
00224             
00225             self.io_loop.add_timeout(time.time() + 0.01, self.stop)
00226             self.wait()
00227             client.read_bytes(256, self.stop)
00228             data = self.wait()
00229             self.assertEqual(b("A") * 256, data)
00230         finally:
00231             server.close()
00232             client.close()
00233 
00234     def test_large_read_until(self):
00235         
00236         
00237         
00238         server, client = self.make_iostream_pair()
00239         try:
00240             NUM_KB = 4096
00241             for i in xrange(NUM_KB):
00242                 client.write(b("A") * 1024)
00243             client.write(b("\r\n"))
00244             server.read_until(b("\r\n"), self.stop)
00245             data = self.wait()
00246             self.assertEqual(len(data), NUM_KB * 1024 + 2)
00247         finally:
00248             server.close()
00249             client.close()