00001 from __future__ import absolute_import, division, print_function, with_statement
00002 from tornado.concurrent import Future
00003 from tornado import gen
00004 from tornado import netutil
00005 from tornado.iostream import IOStream, SSLIOStream, PipeIOStream, StreamClosedError
00006 from tornado.httputil import HTTPHeaders
00007 from tornado.log import gen_log, app_log
00008 from tornado.netutil import ssl_wrap_socket
00009 from tornado.stack_context import NullContext
00010 from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, bind_unused_port, ExpectLog, gen_test
00011 from tornado.test.util import unittest, skipIfNonUnix
00012 from tornado.web import RequestHandler, Application
00013 import certifi
00014 import errno
00015 import logging
00016 import os
00017 import platform
00018 import socket
00019 import ssl
00020 import sys
00021
00022
00023 def _server_ssl_options():
00024 return dict(
00025 certfile=os.path.join(os.path.dirname(__file__), 'test.crt'),
00026 keyfile=os.path.join(os.path.dirname(__file__), 'test.key'),
00027 )
00028
00029
00030 class HelloHandler(RequestHandler):
00031 def get(self):
00032 self.write("Hello")
00033
00034
00035 class TestIOStreamWebMixin(object):
00036 def _make_client_iostream(self):
00037 raise NotImplementedError()
00038
00039 def get_app(self):
00040 return Application([('/', HelloHandler)])
00041
00042 def test_connection_closed(self):
00043
00044
00045
00046
00047
00048
00049 response = self.fetch("/", headers={"Connection": "close"})
00050 response.rethrow()
00051
00052 def test_read_until_close(self):
00053 stream = self._make_client_iostream()
00054 stream.connect(('localhost', self.get_http_port()), callback=self.stop)
00055 self.wait()
00056 stream.write(b"GET / HTTP/1.0\r\n\r\n")
00057
00058 stream.read_until_close(self.stop)
00059 data = self.wait()
00060 self.assertTrue(data.startswith(b"HTTP/1.0 200"))
00061 self.assertTrue(data.endswith(b"Hello"))
00062
00063 def test_read_zero_bytes(self):
00064 self.stream = self._make_client_iostream()
00065 self.stream.connect(("localhost", self.get_http_port()),
00066 callback=self.stop)
00067 self.wait()
00068 self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
00069
00070
00071 self.stream.read_bytes(9, self.stop)
00072 data = self.wait()
00073 self.assertEqual(data, b"HTTP/1.0 ")
00074
00075
00076 self.stream.read_bytes(0, self.stop)
00077 data = self.wait()
00078 self.assertEqual(data, b"")
00079
00080
00081 self.stream.read_bytes(3, self.stop)
00082 data = self.wait()
00083 self.assertEqual(data, b"200")
00084
00085 self.stream.close()
00086
00087 def test_write_while_connecting(self):
00088 stream = self._make_client_iostream()
00089 connected = [False]
00090
00091 def connected_callback():
00092 connected[0] = True
00093 self.stop()
00094 stream.connect(("localhost", self.get_http_port()),
00095 callback=connected_callback)
00096
00097
00098 written = [False]
00099
00100 def write_callback():
00101 written[0] = True
00102 self.stop()
00103 stream.write(b"GET / HTTP/1.0\r\nConnection: close\r\n\r\n",
00104 callback=write_callback)
00105 self.assertTrue(not connected[0])
00106
00107
00108 try:
00109 self.wait(lambda: connected[0] and written[0])
00110 finally:
00111 logging.debug((connected, written))
00112
00113 stream.read_until_close(self.stop)
00114 data = self.wait()
00115 self.assertTrue(data.endswith(b"Hello"))
00116
00117 stream.close()
00118
00119 @gen_test
00120 def test_future_interface(self):
00121 """Basic test of IOStream's ability to return Futures."""
00122 stream = self._make_client_iostream()
00123 connect_result = yield stream.connect(
00124 ("localhost", self.get_http_port()))
00125 self.assertIs(connect_result, stream)
00126 yield stream.write(b"GET / HTTP/1.0\r\n\r\n")
00127 first_line = yield stream.read_until(b"\r\n")
00128 self.assertEqual(first_line, b"HTTP/1.0 200 OK\r\n")
00129
00130 header_data = yield stream.read_until(b"\r\n\r\n", callback=None)
00131 headers = HTTPHeaders.parse(header_data.decode('latin1'))
00132 content_length = int(headers['Content-Length'])
00133 body = yield stream.read_bytes(content_length)
00134 self.assertEqual(body, b'Hello')
00135 stream.close()
00136
00137 @gen_test
00138 def test_future_close_while_reading(self):
00139 stream = self._make_client_iostream()
00140 yield stream.connect(("localhost", self.get_http_port()))
00141 yield stream.write(b"GET / HTTP/1.0\r\n\r\n")
00142 with self.assertRaises(StreamClosedError):
00143 yield stream.read_bytes(1024 * 1024)
00144 stream.close()
00145
00146 @gen_test
00147 def test_future_read_until_close(self):
00148
00149 stream = self._make_client_iostream()
00150 yield stream.connect(("localhost", self.get_http_port()))
00151 yield stream.write(b"GET / HTTP/1.0\r\nConnection: close\r\n\r\n")
00152 yield stream.read_until(b"\r\n\r\n")
00153 body = yield stream.read_until_close()
00154 self.assertEqual(body, b"Hello")
00155
00156
00157
00158 with self.assertRaises(StreamClosedError):
00159 stream.read_bytes(1)
00160
00161
00162 class TestIOStreamMixin(object):
00163 def _make_server_iostream(self, connection, **kwargs):
00164 raise NotImplementedError()
00165
00166 def _make_client_iostream(self, connection, **kwargs):
00167 raise NotImplementedError()
00168
00169 def make_iostream_pair(self, **kwargs):
00170 listener, port = bind_unused_port()
00171 streams = [None, None]
00172
00173 def accept_callback(connection, address):
00174 streams[0] = self._make_server_iostream(connection, **kwargs)
00175 self.stop()
00176
00177 def connect_callback():
00178 streams[1] = client_stream
00179 self.stop()
00180 netutil.add_accept_handler(listener, accept_callback,
00181 io_loop=self.io_loop)
00182 client_stream = self._make_client_iostream(socket.socket(), **kwargs)
00183 client_stream.connect(('127.0.0.1', port),
00184 callback=connect_callback)
00185 self.wait(condition=lambda: all(streams))
00186 self.io_loop.remove_handler(listener.fileno())
00187 listener.close()
00188 return streams
00189
00190 def test_streaming_callback_with_data_in_buffer(self):
00191 server, client = self.make_iostream_pair()
00192 client.write(b"abcd\r\nefgh")
00193 server.read_until(b"\r\n", self.stop)
00194 data = self.wait()
00195 self.assertEqual(data, b"abcd\r\n")
00196
00197 def closed_callback(chunk):
00198 self.fail()
00199 server.read_until_close(callback=closed_callback,
00200 streaming_callback=self.stop)
00201
00202 data = self.wait()
00203 self.assertEqual(data, b"efgh")
00204 server.close()
00205 client.close()
00206
00207 def test_write_zero_bytes(self):
00208
00209
00210 server, client = self.make_iostream_pair()
00211 server.write(b'', callback=self.stop)
00212 self.wait()
00213 server.close()
00214 client.close()
00215
00216 def test_connection_refused(self):
00217
00218
00219
00220 server_socket, port = bind_unused_port()
00221 server_socket.close()
00222 stream = IOStream(socket.socket(), self.io_loop)
00223 self.connect_called = False
00224
00225 def connect_callback():
00226 self.connect_called = True
00227 stream.set_close_callback(self.stop)
00228
00229 with ExpectLog(gen_log, ".*", required=False):
00230 stream.connect(("localhost", port), connect_callback)
00231 self.wait()
00232 self.assertFalse(self.connect_called)
00233 self.assertTrue(isinstance(stream.error, socket.error), stream.error)
00234 if sys.platform != 'cygwin':
00235 _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,)
00236 if hasattr(errno, "WSAECONNREFUSED"):
00237 _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,)
00238
00239 self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
00240
00241 def test_gaierror(self):
00242
00243 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00244 stream = IOStream(s, io_loop=self.io_loop)
00245 stream.set_close_callback(self.stop)
00246
00247
00248
00249
00250 with ExpectLog(gen_log, "Connect error"):
00251 stream.connect(('an invalid domain', 54321))
00252 self.assertTrue(isinstance(stream.error, socket.gaierror), stream.error)
00253
00254 def test_read_callback_error(self):
00255
00256 server, client = self.make_iostream_pair()
00257 try:
00258 server.set_close_callback(self.stop)
00259 with ExpectLog(
00260 app_log, "(Uncaught exception|Exception in callback)"
00261 ):
00262
00263 with NullContext():
00264 server.read_bytes(1, callback=lambda data: 1 / 0)
00265 client.write(b"1")
00266 self.wait()
00267 self.assertTrue(isinstance(server.error, ZeroDivisionError))
00268 finally:
00269 server.close()
00270 client.close()
00271
00272 def test_streaming_callback(self):
00273 server, client = self.make_iostream_pair()
00274 try:
00275 chunks = []
00276 final_called = []
00277
00278 def streaming_callback(data):
00279 chunks.append(data)
00280 self.stop()
00281
00282 def final_callback(data):
00283 self.assertFalse(data)
00284 final_called.append(True)
00285 self.stop()
00286 server.read_bytes(6, callback=final_callback,
00287 streaming_callback=streaming_callback)
00288 client.write(b"1234")
00289 self.wait(condition=lambda: chunks)
00290 client.write(b"5678")
00291 self.wait(condition=lambda: final_called)
00292 self.assertEqual(chunks, [b"1234", b"56"])
00293
00294
00295 server.read_bytes(2, callback=self.stop)
00296 data = self.wait()
00297 self.assertEqual(data, b"78")
00298 finally:
00299 server.close()
00300 client.close()
00301
00302 def test_streaming_until_close(self):
00303 server, client = self.make_iostream_pair()
00304 try:
00305 chunks = []
00306 closed = [False]
00307
00308 def streaming_callback(data):
00309 chunks.append(data)
00310 self.stop()
00311 def close_callback(data):
00312 assert not data, data
00313 closed[0] = True
00314 self.stop()
00315 client.read_until_close(callback=close_callback,
00316 streaming_callback=streaming_callback)
00317 server.write(b"1234")
00318 self.wait(condition=lambda: len(chunks) == 1)
00319 server.write(b"5678", self.stop)
00320 self.wait()
00321 server.close()
00322 self.wait(condition=lambda: closed[0])
00323 self.assertEqual(chunks, [b"1234", b"5678"])
00324 finally:
00325 server.close()
00326 client.close()
00327
00328 def test_delayed_close_callback(self):
00329
00330
00331
00332
00333 server, client = self.make_iostream_pair()
00334 try:
00335 client.set_close_callback(self.stop)
00336 server.write(b"12")
00337 chunks = []
00338
00339 def callback1(data):
00340 chunks.append(data)
00341 client.read_bytes(1, callback2)
00342 server.close()
00343
00344 def callback2(data):
00345 chunks.append(data)
00346 client.read_bytes(1, callback1)
00347 self.wait()
00348 self.assertEqual(chunks, [b"1", b"2"])
00349 finally:
00350 server.close()
00351 client.close()
00352
00353 def test_future_delayed_close_callback(self):
00354
00355 server, client = self.make_iostream_pair()
00356
00357
00358 @gen_test
00359 def f(self):
00360 server.write(b"12")
00361 chunks = []
00362 chunks.append((yield client.read_bytes(1)))
00363 server.close()
00364 chunks.append((yield client.read_bytes(1)))
00365 self.assertEqual(chunks, [b"1", b"2"])
00366 try:
00367 f(self)
00368 finally:
00369 server.close()
00370 client.close()
00371
00372 def test_close_buffered_data(self):
00373
00374
00375
00376
00377
00378
00379
00380
00381 server, client = self.make_iostream_pair(read_chunk_size=256)
00382 try:
00383 server.write(b"A" * 512)
00384 client.read_bytes(256, self.stop)
00385 data = self.wait()
00386 self.assertEqual(b"A" * 256, data)
00387 server.close()
00388
00389
00390
00391 self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
00392 self.wait()
00393 client.read_bytes(256, self.stop)
00394 data = self.wait()
00395 self.assertEqual(b"A" * 256, data)
00396 finally:
00397 server.close()
00398 client.close()
00399
00400 def test_read_until_close_after_close(self):
00401
00402
00403 server, client = self.make_iostream_pair()
00404 try:
00405 server.write(b"1234")
00406 server.close()
00407
00408
00409
00410 client.read_bytes(1, self.stop)
00411 data = self.wait()
00412 self.assertEqual(data, b"1")
00413 client.read_until_close(self.stop)
00414 data = self.wait()
00415 self.assertEqual(data, b"234")
00416 finally:
00417 server.close()
00418 client.close()
00419
00420 def test_streaming_read_until_close_after_close(self):
00421
00422
00423
00424 server, client = self.make_iostream_pair()
00425 try:
00426 server.write(b"1234")
00427 server.close()
00428 client.read_bytes(1, self.stop)
00429 data = self.wait()
00430 self.assertEqual(data, b"1")
00431 streaming_data = []
00432 client.read_until_close(self.stop,
00433 streaming_callback=streaming_data.append)
00434 data = self.wait()
00435 self.assertEqual(b'', data)
00436 self.assertEqual(b''.join(streaming_data), b"234")
00437 finally:
00438 server.close()
00439 client.close()
00440
00441 def test_large_read_until(self):
00442
00443
00444
00445 server, client = self.make_iostream_pair()
00446 try:
00447
00448
00449
00450 if (isinstance(server, SSLIOStream) and
00451 platform.python_implementation() == 'PyPy'):
00452 raise unittest.SkipTest(
00453 "pypy gc causes problems with openssl")
00454 NUM_KB = 4096
00455 for i in range(NUM_KB):
00456 client.write(b"A" * 1024)
00457 client.write(b"\r\n")
00458 server.read_until(b"\r\n", self.stop)
00459 data = self.wait()
00460 self.assertEqual(len(data), NUM_KB * 1024 + 2)
00461 finally:
00462 server.close()
00463 client.close()
00464
00465 def test_close_callback_with_pending_read(self):
00466
00467
00468
00469 OK = b"OK\r\n"
00470 server, client = self.make_iostream_pair()
00471 client.set_close_callback(self.stop)
00472 try:
00473 server.write(OK)
00474 client.read_until(b"\r\n", self.stop)
00475 res = self.wait()
00476 self.assertEqual(res, OK)
00477
00478 server.close()
00479 client.read_until(b"\r\n", lambda x: x)
00480
00481
00482
00483 res = self.wait()
00484 self.assertTrue(res is None)
00485 finally:
00486 server.close()
00487 client.close()
00488
00489 @skipIfNonUnix
00490 def test_inline_read_error(self):
00491
00492
00493
00494
00495
00496
00497
00498
00499 server, client = self.make_iostream_pair()
00500 try:
00501 os.close(server.socket.fileno())
00502 with self.assertRaises(socket.error):
00503 server.read_bytes(1, lambda data: None)
00504 finally:
00505 server.close()
00506 client.close()
00507
00508 def test_async_read_error_logging(self):
00509
00510
00511 server, client = self.make_iostream_pair()
00512 server.set_close_callback(self.stop)
00513 try:
00514
00515 server.read_bytes(1, lambda data: None)
00516 client.write(b'a')
00517
00518
00519 def fake_read_from_fd():
00520 os.close(server.socket.fileno())
00521 server.__class__.read_from_fd(server)
00522 server.read_from_fd = fake_read_from_fd
00523
00524 with ExpectLog(gen_log, "error on read"):
00525 self.wait()
00526 finally:
00527 server.close()
00528 client.close()
00529
00530 def test_future_close_callback(self):
00531
00532
00533 server, client = self.make_iostream_pair()
00534 closed = [False]
00535 def close_callback():
00536 closed[0] = True
00537 self.stop()
00538 server.set_close_callback(close_callback)
00539 try:
00540 client.write(b'a')
00541 future = server.read_bytes(1)
00542 self.io_loop.add_future(future, self.stop)
00543 self.assertEqual(self.wait().result(), b'a')
00544 self.assertFalse(closed[0])
00545 client.close()
00546 self.wait()
00547 self.assertTrue(closed[0])
00548 finally:
00549 server.close()
00550 client.close()
00551
00552 def test_read_bytes_partial(self):
00553 server, client = self.make_iostream_pair()
00554 try:
00555
00556 client.read_bytes(50, self.stop, partial=True)
00557 server.write(b"hello")
00558 data = self.wait()
00559 self.assertEqual(data, b"hello")
00560
00561
00562
00563 client.read_bytes(3, self.stop, partial=True)
00564 server.write(b"world")
00565 data = self.wait()
00566 self.assertEqual(data, b"wor")
00567
00568
00569
00570 client.read_bytes(0, self.stop, partial=True)
00571 data = self.wait()
00572 self.assertEqual(data, b'')
00573 finally:
00574 server.close()
00575 client.close()
00576
00577 def test_read_until_max_bytes(self):
00578 server, client = self.make_iostream_pair()
00579 client.set_close_callback(lambda: self.stop("closed"))
00580 try:
00581
00582 client.read_until(b"def", self.stop, max_bytes=50)
00583 server.write(b"abcdef")
00584 data = self.wait()
00585 self.assertEqual(data, b"abcdef")
00586
00587
00588 client.read_until(b"def", self.stop, max_bytes=6)
00589 server.write(b"abcdef")
00590 data = self.wait()
00591 self.assertEqual(data, b"abcdef")
00592
00593
00594
00595 with ExpectLog(gen_log, "Unsatisfiable read"):
00596 client.read_until(b"def", self.stop, max_bytes=5)
00597 server.write(b"123456")
00598 data = self.wait()
00599 self.assertEqual(data, "closed")
00600 finally:
00601 server.close()
00602 client.close()
00603
00604 def test_read_until_max_bytes_inline(self):
00605 server, client = self.make_iostream_pair()
00606 client.set_close_callback(lambda: self.stop("closed"))
00607 try:
00608
00609
00610
00611
00612 server.write(b"123456")
00613 with ExpectLog(gen_log, "Unsatisfiable read"):
00614 client.read_until(b"def", self.stop, max_bytes=5)
00615 data = self.wait()
00616 self.assertEqual(data, "closed")
00617 finally:
00618 server.close()
00619 client.close()
00620
00621 def test_read_until_max_bytes_ignores_extra(self):
00622 server, client = self.make_iostream_pair()
00623 client.set_close_callback(lambda: self.stop("closed"))
00624 try:
00625
00626
00627
00628 server.write(b"abcdef")
00629 with ExpectLog(gen_log, "Unsatisfiable read"):
00630 client.read_until(b"def", self.stop, max_bytes=5)
00631 data = self.wait()
00632 self.assertEqual(data, "closed")
00633 finally:
00634 server.close()
00635 client.close()
00636
00637 def test_read_until_regex_max_bytes(self):
00638 server, client = self.make_iostream_pair()
00639 client.set_close_callback(lambda: self.stop("closed"))
00640 try:
00641
00642 client.read_until_regex(b"def", self.stop, max_bytes=50)
00643 server.write(b"abcdef")
00644 data = self.wait()
00645 self.assertEqual(data, b"abcdef")
00646
00647
00648 client.read_until_regex(b"def", self.stop, max_bytes=6)
00649 server.write(b"abcdef")
00650 data = self.wait()
00651 self.assertEqual(data, b"abcdef")
00652
00653
00654
00655 with ExpectLog(gen_log, "Unsatisfiable read"):
00656 client.read_until_regex(b"def", self.stop, max_bytes=5)
00657 server.write(b"123456")
00658 data = self.wait()
00659 self.assertEqual(data, "closed")
00660 finally:
00661 server.close()
00662 client.close()
00663
00664 def test_read_until_regex_max_bytes_inline(self):
00665 server, client = self.make_iostream_pair()
00666 client.set_close_callback(lambda: self.stop("closed"))
00667 try:
00668
00669
00670
00671
00672 server.write(b"123456")
00673 with ExpectLog(gen_log, "Unsatisfiable read"):
00674 client.read_until_regex(b"def", self.stop, max_bytes=5)
00675 data = self.wait()
00676 self.assertEqual(data, "closed")
00677 finally:
00678 server.close()
00679 client.close()
00680
00681 def test_read_until_regex_max_bytes_ignores_extra(self):
00682 server, client = self.make_iostream_pair()
00683 client.set_close_callback(lambda: self.stop("closed"))
00684 try:
00685
00686
00687
00688 server.write(b"abcdef")
00689 with ExpectLog(gen_log, "Unsatisfiable read"):
00690 client.read_until_regex(b"def", self.stop, max_bytes=5)
00691 data = self.wait()
00692 self.assertEqual(data, "closed")
00693 finally:
00694 server.close()
00695 client.close()
00696
00697 def test_small_reads_from_large_buffer(self):
00698
00699
00700
00701 server, client = self.make_iostream_pair(max_buffer_size=10 * 1024)
00702 try:
00703 server.write(b"a" * 1024 * 100)
00704 for i in range(100):
00705 client.read_bytes(1024, self.stop)
00706 data = self.wait()
00707 self.assertEqual(data, b"a" * 1024)
00708 finally:
00709 server.close()
00710 client.close()
00711
00712 def test_small_read_untils_from_large_buffer(self):
00713
00714
00715
00716 server, client = self.make_iostream_pair(max_buffer_size=10 * 1024)
00717 try:
00718 server.write((b"a" * 1023 + b"\n") * 100)
00719 for i in range(100):
00720 client.read_until(b"\n", self.stop, max_bytes=4096)
00721 data = self.wait()
00722 self.assertEqual(data, b"a" * 1023 + b"\n")
00723 finally:
00724 server.close()
00725 client.close()
00726
00727
00728 class TestIOStreamWebHTTP(TestIOStreamWebMixin, AsyncHTTPTestCase):
00729 def _make_client_iostream(self):
00730 return IOStream(socket.socket(), io_loop=self.io_loop)
00731
00732
00733 class TestIOStreamWebHTTPS(TestIOStreamWebMixin, AsyncHTTPSTestCase):
00734 def _make_client_iostream(self):
00735 return SSLIOStream(socket.socket(), io_loop=self.io_loop)
00736
00737
00738 class TestIOStream(TestIOStreamMixin, AsyncTestCase):
00739 def _make_server_iostream(self, connection, **kwargs):
00740 return IOStream(connection, **kwargs)
00741
00742 def _make_client_iostream(self, connection, **kwargs):
00743 return IOStream(connection, **kwargs)
00744
00745
00746 class TestIOStreamSSL(TestIOStreamMixin, AsyncTestCase):
00747 def _make_server_iostream(self, connection, **kwargs):
00748 connection = ssl.wrap_socket(connection,
00749 server_side=True,
00750 do_handshake_on_connect=False,
00751 **_server_ssl_options())
00752 return SSLIOStream(connection, io_loop=self.io_loop, **kwargs)
00753
00754 def _make_client_iostream(self, connection, **kwargs):
00755 return SSLIOStream(connection, io_loop=self.io_loop, **kwargs)
00756
00757
00758
00759
00760
00761 @unittest.skipIf(not hasattr(ssl, 'SSLContext'), 'ssl.SSLContext not present')
00762 class TestIOStreamSSLContext(TestIOStreamMixin, AsyncTestCase):
00763 def _make_server_iostream(self, connection, **kwargs):
00764 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
00765 context.load_cert_chain(
00766 os.path.join(os.path.dirname(__file__), 'test.crt'),
00767 os.path.join(os.path.dirname(__file__), 'test.key'))
00768 connection = ssl_wrap_socket(connection, context,
00769 server_side=True,
00770 do_handshake_on_connect=False)
00771 return SSLIOStream(connection, io_loop=self.io_loop, **kwargs)
00772
00773 def _make_client_iostream(self, connection, **kwargs):
00774 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
00775 return SSLIOStream(connection, io_loop=self.io_loop,
00776 ssl_options=context, **kwargs)
00777
00778
00779 class TestIOStreamStartTLS(AsyncTestCase):
00780 def setUp(self):
00781 try:
00782 super(TestIOStreamStartTLS, self).setUp()
00783 self.listener, self.port = bind_unused_port()
00784 self.server_stream = None
00785 self.server_accepted = Future()
00786 netutil.add_accept_handler(self.listener, self.accept)
00787 self.client_stream = IOStream(socket.socket())
00788 self.io_loop.add_future(self.client_stream.connect(
00789 ('127.0.0.1', self.port)), self.stop)
00790 self.wait()
00791 self.io_loop.add_future(self.server_accepted, self.stop)
00792 self.wait()
00793 except Exception as e:
00794 print(e)
00795 raise
00796
00797 def tearDown(self):
00798 if self.server_stream is not None:
00799 self.server_stream.close()
00800 if self.client_stream is not None:
00801 self.client_stream.close()
00802 self.listener.close()
00803 super(TestIOStreamStartTLS, self).tearDown()
00804
00805 def accept(self, connection, address):
00806 if self.server_stream is not None:
00807 self.fail("should only get one connection")
00808 self.server_stream = IOStream(connection)
00809 self.server_accepted.set_result(None)
00810
00811 @gen.coroutine
00812 def client_send_line(self, line):
00813 self.client_stream.write(line)
00814 recv_line = yield self.server_stream.read_until(b"\r\n")
00815 self.assertEqual(line, recv_line)
00816
00817 @gen.coroutine
00818 def server_send_line(self, line):
00819 self.server_stream.write(line)
00820 recv_line = yield self.client_stream.read_until(b"\r\n")
00821 self.assertEqual(line, recv_line)
00822
00823 def client_start_tls(self, ssl_options=None):
00824 client_stream = self.client_stream
00825 self.client_stream = None
00826 return client_stream.start_tls(False, ssl_options)
00827
00828 def server_start_tls(self, ssl_options=None):
00829 server_stream = self.server_stream
00830 self.server_stream = None
00831 return server_stream.start_tls(True, ssl_options)
00832
00833 @gen_test
00834 def test_start_tls_smtp(self):
00835
00836
00837
00838
00839 yield self.server_send_line(b"220 mail.example.com ready\r\n")
00840 yield self.client_send_line(b"EHLO mail.example.com\r\n")
00841 yield self.server_send_line(b"250-mail.example.com welcome\r\n")
00842 yield self.server_send_line(b"250 STARTTLS\r\n")
00843 yield self.client_send_line(b"STARTTLS\r\n")
00844 yield self.server_send_line(b"220 Go ahead\r\n")
00845 client_future = self.client_start_tls()
00846 server_future = self.server_start_tls(_server_ssl_options())
00847 self.client_stream = yield client_future
00848 self.server_stream = yield server_future
00849 self.assertTrue(isinstance(self.client_stream, SSLIOStream))
00850 self.assertTrue(isinstance(self.server_stream, SSLIOStream))
00851 yield self.client_send_line(b"EHLO mail.example.com\r\n")
00852 yield self.server_send_line(b"250 mail.example.com welcome\r\n")
00853
00854 @gen_test
00855 def test_handshake_fail(self):
00856 self.server_start_tls(_server_ssl_options())
00857 client_future = self.client_start_tls(
00858 dict(cert_reqs=ssl.CERT_REQUIRED, ca_certs=certifi.where()))
00859 with ExpectLog(gen_log, "SSL Error"):
00860 with self.assertRaises(ssl.SSLError):
00861 yield client_future
00862
00863
00864 @skipIfNonUnix
00865 class TestPipeIOStream(AsyncTestCase):
00866 def test_pipe_iostream(self):
00867 r, w = os.pipe()
00868
00869 rs = PipeIOStream(r, io_loop=self.io_loop)
00870 ws = PipeIOStream(w, io_loop=self.io_loop)
00871
00872 ws.write(b"hel")
00873 ws.write(b"lo world")
00874
00875 rs.read_until(b' ', callback=self.stop)
00876 data = self.wait()
00877 self.assertEqual(data, b"hello ")
00878
00879 rs.read_bytes(3, self.stop)
00880 data = self.wait()
00881 self.assertEqual(data, b"wor")
00882
00883 ws.close()
00884
00885 rs.read_until_close(self.stop)
00886 data = self.wait()
00887 self.assertEqual(data, b"ld")
00888
00889 rs.close()
00890
00891 def test_pipe_iostream_big_write(self):
00892 r, w = os.pipe()
00893
00894 rs = PipeIOStream(r, io_loop=self.io_loop)
00895 ws = PipeIOStream(w, io_loop=self.io_loop)
00896
00897 NUM_BYTES = 1048576
00898
00899
00900 ws.write(b"1" * NUM_BYTES)
00901
00902 rs.read_bytes(NUM_BYTES, self.stop)
00903 data = self.wait()
00904 self.assertEqual(data, b"1" * NUM_BYTES)
00905
00906 ws.close()
00907 rs.close()