00001 from __future__ import absolute_import, division, with_statement
00002 from tornado import netutil
00003 from tornado.ioloop import IOLoop
00004 from tornado.iostream import IOStream
00005 from tornado.testing import AsyncHTTPTestCase, LogTrapTestCase, get_unused_port
00006 from tornado.util import b
00007 from tornado.web import RequestHandler, Application
00008 import errno
00009 import socket
00010 import sys
00011 import time
00012
00013
00014 class HelloHandler(RequestHandler):
00015 def get(self):
00016 self.write("Hello")
00017
00018
00019 class TestIOStream(AsyncHTTPTestCase, LogTrapTestCase):
00020 def get_app(self):
00021 return Application([('/', HelloHandler)])
00022
00023 def make_iostream_pair(self, **kwargs):
00024 port = get_unused_port()
00025 [listener] = netutil.bind_sockets(port, '127.0.0.1',
00026 family=socket.AF_INET)
00027 streams = [None, None]
00028
00029 def accept_callback(connection, address):
00030 streams[0] = IOStream(connection, io_loop=self.io_loop, **kwargs)
00031 self.stop()
00032
00033 def connect_callback():
00034 streams[1] = client_stream
00035 self.stop()
00036 netutil.add_accept_handler(listener, accept_callback,
00037 io_loop=self.io_loop)
00038 client_stream = IOStream(socket.socket(), io_loop=self.io_loop,
00039 **kwargs)
00040 client_stream.connect(('127.0.0.1', port),
00041 callback=connect_callback)
00042 self.wait(condition=lambda: all(streams))
00043 self.io_loop.remove_handler(listener.fileno())
00044 listener.close()
00045 return streams
00046
00047 def test_read_zero_bytes(self):
00048 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00049 s.connect(("localhost", self.get_http_port()))
00050 self.stream = IOStream(s, io_loop=self.io_loop)
00051 self.stream.write(b("GET / HTTP/1.0\r\n\r\n"))
00052
00053
00054 self.stream.read_bytes(9, self.stop)
00055 data = self.wait()
00056 self.assertEqual(data, b("HTTP/1.0 "))
00057
00058
00059 self.stream.read_bytes(0, self.stop)
00060 data = self.wait()
00061 self.assertEqual(data, b(""))
00062
00063
00064 self.stream.read_bytes(3, self.stop)
00065 data = self.wait()
00066 self.assertEqual(data, b("200"))
00067
00068 s.close()
00069
00070 def test_write_zero_bytes(self):
00071
00072
00073 server, client = self.make_iostream_pair()
00074 server.write(b(''), callback=self.stop)
00075 self.wait()
00076
00077
00078 self.assertEqual(server._state, IOLoop.READ | IOLoop.ERROR)
00079 server.close()
00080 client.close()
00081
00082 def test_connection_refused(self):
00083
00084
00085
00086 port = get_unused_port()
00087 stream = IOStream(socket.socket(), self.io_loop)
00088 self.connect_called = False
00089
00090 def connect_callback():
00091 self.connect_called = True
00092 stream.set_close_callback(self.stop)
00093 stream.connect(("localhost", port), connect_callback)
00094 self.wait()
00095 self.assertFalse(self.connect_called)
00096 self.assertTrue(isinstance(stream.error, socket.error), stream.error)
00097 if sys.platform != 'cygwin':
00098
00099 self.assertEqual(stream.error.args[0], errno.ECONNREFUSED)
00100
00101 def test_gaierror(self):
00102
00103 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00104 stream = IOStream(s, io_loop=self.io_loop)
00105 stream.set_close_callback(self.stop)
00106 stream.connect(('adomainthatdoesntexist.asdf', 54321))
00107 self.assertTrue(isinstance(stream.error, socket.gaierror), stream.error)
00108
00109 def test_connection_closed(self):
00110
00111
00112
00113
00114
00115
00116 response = self.fetch("/", headers={"Connection": "close"})
00117 response.rethrow()
00118
00119 def test_read_until_close(self):
00120 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00121 s.connect(("localhost", self.get_http_port()))
00122 stream = IOStream(s, io_loop=self.io_loop)
00123 stream.write(b("GET / HTTP/1.0\r\n\r\n"))
00124
00125 stream.read_until_close(self.stop)
00126 data = self.wait()
00127 self.assertTrue(data.startswith(b("HTTP/1.0 200")))
00128 self.assertTrue(data.endswith(b("Hello")))
00129
00130 def test_streaming_callback(self):
00131 server, client = self.make_iostream_pair()
00132 try:
00133 chunks = []
00134 final_called = []
00135
00136 def streaming_callback(data):
00137 chunks.append(data)
00138 self.stop()
00139
00140 def final_callback(data):
00141 assert not data
00142 final_called.append(True)
00143 self.stop()
00144 server.read_bytes(6, callback=final_callback,
00145 streaming_callback=streaming_callback)
00146 client.write(b("1234"))
00147 self.wait(condition=lambda: chunks)
00148 client.write(b("5678"))
00149 self.wait(condition=lambda: final_called)
00150 self.assertEqual(chunks, [b("1234"), b("56")])
00151
00152
00153 server.read_bytes(2, callback=self.stop)
00154 data = self.wait()
00155 self.assertEqual(data, b("78"))
00156 finally:
00157 server.close()
00158 client.close()
00159
00160 def test_streaming_until_close(self):
00161 server, client = self.make_iostream_pair()
00162 try:
00163 chunks = []
00164
00165 def callback(data):
00166 chunks.append(data)
00167 self.stop()
00168 client.read_until_close(callback=callback,
00169 streaming_callback=callback)
00170 server.write(b("1234"))
00171 self.wait()
00172 server.write(b("5678"))
00173 self.wait()
00174 server.close()
00175 self.wait()
00176 self.assertEqual(chunks, [b("1234"), b("5678"), b("")])
00177 finally:
00178 server.close()
00179 client.close()
00180
00181 def test_delayed_close_callback(self):
00182
00183
00184
00185
00186 server, client = self.make_iostream_pair()
00187 try:
00188 client.set_close_callback(self.stop)
00189 server.write(b("12"))
00190 chunks = []
00191
00192 def callback1(data):
00193 chunks.append(data)
00194 client.read_bytes(1, callback2)
00195 server.close()
00196
00197 def callback2(data):
00198 chunks.append(data)
00199 client.read_bytes(1, callback1)
00200 self.wait()
00201 self.assertEqual(chunks, [b("1"), b("2")])
00202 finally:
00203 server.close()
00204 client.close()
00205
00206 def test_close_buffered_data(self):
00207
00208
00209
00210
00211
00212
00213
00214
00215 server, client = self.make_iostream_pair(read_chunk_size=256)
00216 try:
00217 server.write(b("A") * 512)
00218 client.read_bytes(256, self.stop)
00219 data = self.wait()
00220 self.assertEqual(b("A") * 256, data)
00221 server.close()
00222
00223
00224
00225 self.io_loop.add_timeout(time.time() + 0.01, self.stop)
00226 self.wait()
00227 client.read_bytes(256, self.stop)
00228 data = self.wait()
00229 self.assertEqual(b("A") * 256, data)
00230 finally:
00231 server.close()
00232 client.close()
00233
00234 def test_large_read_until(self):
00235
00236
00237
00238 server, client = self.make_iostream_pair()
00239 try:
00240 NUM_KB = 4096
00241 for i in xrange(NUM_KB):
00242 client.write(b("A") * 1024)
00243 client.write(b("\r\n"))
00244 server.read_until(b("\r\n"), self.stop)
00245 data = self.wait()
00246 self.assertEqual(len(data), NUM_KB * 1024 + 2)
00247 finally:
00248 server.close()
00249 client.close()