iostream_test.py
Go to the documentation of this file.
00001 from __future__ import absolute_import, division, print_function, with_statement
00002 from tornado.concurrent import Future
00003 from tornado import gen
00004 from tornado import netutil
00005 from tornado.iostream import IOStream, SSLIOStream, PipeIOStream, StreamClosedError
00006 from tornado.httputil import HTTPHeaders
00007 from tornado.log import gen_log, app_log
00008 from tornado.netutil import ssl_wrap_socket
00009 from tornado.stack_context import NullContext
00010 from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, bind_unused_port, ExpectLog, gen_test
00011 from tornado.test.util import unittest, skipIfNonUnix
00012 from tornado.web import RequestHandler, Application
00013 import certifi
00014 import errno
00015 import logging
00016 import os
00017 import platform
00018 import socket
00019 import ssl
00020 import sys
00021 
00022 
00023 def _server_ssl_options():
00024     return dict(
00025         certfile=os.path.join(os.path.dirname(__file__), 'test.crt'),
00026         keyfile=os.path.join(os.path.dirname(__file__), 'test.key'),
00027     )
00028 
00029 
00030 class HelloHandler(RequestHandler):
00031     def get(self):
00032         self.write("Hello")
00033 
00034 
00035 class TestIOStreamWebMixin(object):
00036     def _make_client_iostream(self):
00037         raise NotImplementedError()
00038 
00039     def get_app(self):
00040         return Application([('/', HelloHandler)])
00041 
00042     def test_connection_closed(self):
00043         # When a server sends a response and then closes the connection,
00044         # the client must be allowed to read the data before the IOStream
00045         # closes itself.  Epoll reports closed connections with a separate
00046         # EPOLLRDHUP event delivered at the same time as the read event,
00047         # while kqueue reports them as a second read/write event with an EOF
00048         # flag.
00049         response = self.fetch("/", headers={"Connection": "close"})
00050         response.rethrow()
00051 
00052     def test_read_until_close(self):
00053         stream = self._make_client_iostream()
00054         stream.connect(('localhost', self.get_http_port()), callback=self.stop)
00055         self.wait()
00056         stream.write(b"GET / HTTP/1.0\r\n\r\n")
00057 
00058         stream.read_until_close(self.stop)
00059         data = self.wait()
00060         self.assertTrue(data.startswith(b"HTTP/1.0 200"))
00061         self.assertTrue(data.endswith(b"Hello"))
00062 
00063     def test_read_zero_bytes(self):
00064         self.stream = self._make_client_iostream()
00065         self.stream.connect(("localhost", self.get_http_port()),
00066                             callback=self.stop)
00067         self.wait()
00068         self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
00069 
00070         # normal read
00071         self.stream.read_bytes(9, self.stop)
00072         data = self.wait()
00073         self.assertEqual(data, b"HTTP/1.0 ")
00074 
00075         # zero bytes
00076         self.stream.read_bytes(0, self.stop)
00077         data = self.wait()
00078         self.assertEqual(data, b"")
00079 
00080         # another normal read
00081         self.stream.read_bytes(3, self.stop)
00082         data = self.wait()
00083         self.assertEqual(data, b"200")
00084 
00085         self.stream.close()
00086 
00087     def test_write_while_connecting(self):
00088         stream = self._make_client_iostream()
00089         connected = [False]
00090 
00091         def connected_callback():
00092             connected[0] = True
00093             self.stop()
00094         stream.connect(("localhost", self.get_http_port()),
00095                        callback=connected_callback)
00096         # unlike the previous tests, try to write before the connection
00097         # is complete.
00098         written = [False]
00099 
00100         def write_callback():
00101             written[0] = True
00102             self.stop()
00103         stream.write(b"GET / HTTP/1.0\r\nConnection: close\r\n\r\n",
00104                      callback=write_callback)
00105         self.assertTrue(not connected[0])
00106         # by the time the write has flushed, the connection callback has
00107         # also run
00108         try:
00109             self.wait(lambda: connected[0] and written[0])
00110         finally:
00111             logging.debug((connected, written))
00112 
00113         stream.read_until_close(self.stop)
00114         data = self.wait()
00115         self.assertTrue(data.endswith(b"Hello"))
00116 
00117         stream.close()
00118 
00119     @gen_test
00120     def test_future_interface(self):
00121         """Basic test of IOStream's ability to return Futures."""
00122         stream = self._make_client_iostream()
00123         connect_result = yield stream.connect(
00124             ("localhost", self.get_http_port()))
00125         self.assertIs(connect_result, stream)
00126         yield stream.write(b"GET / HTTP/1.0\r\n\r\n")
00127         first_line = yield stream.read_until(b"\r\n")
00128         self.assertEqual(first_line, b"HTTP/1.0 200 OK\r\n")
00129         # callback=None is equivalent to no callback.
00130         header_data = yield stream.read_until(b"\r\n\r\n", callback=None)
00131         headers = HTTPHeaders.parse(header_data.decode('latin1'))
00132         content_length = int(headers['Content-Length'])
00133         body = yield stream.read_bytes(content_length)
00134         self.assertEqual(body, b'Hello')
00135         stream.close()
00136 
00137     @gen_test
00138     def test_future_close_while_reading(self):
00139         stream = self._make_client_iostream()
00140         yield stream.connect(("localhost", self.get_http_port()))
00141         yield stream.write(b"GET / HTTP/1.0\r\n\r\n")
00142         with self.assertRaises(StreamClosedError):
00143             yield stream.read_bytes(1024 * 1024)
00144         stream.close()
00145 
00146     @gen_test
00147     def test_future_read_until_close(self):
00148         # Ensure that the data comes through before the StreamClosedError.
00149         stream = self._make_client_iostream()
00150         yield stream.connect(("localhost", self.get_http_port()))
00151         yield stream.write(b"GET / HTTP/1.0\r\nConnection: close\r\n\r\n")
00152         yield stream.read_until(b"\r\n\r\n")
00153         body = yield stream.read_until_close()
00154         self.assertEqual(body, b"Hello")
00155 
00156         # Nothing else to read; the error comes immediately without waiting
00157         # for yield.
00158         with self.assertRaises(StreamClosedError):
00159             stream.read_bytes(1)
00160 
00161 
00162 class TestIOStreamMixin(object):
00163     def _make_server_iostream(self, connection, **kwargs):
00164         raise NotImplementedError()
00165 
00166     def _make_client_iostream(self, connection, **kwargs):
00167         raise NotImplementedError()
00168 
00169     def make_iostream_pair(self, **kwargs):
00170         listener, port = bind_unused_port()
00171         streams = [None, None]
00172 
00173         def accept_callback(connection, address):
00174             streams[0] = self._make_server_iostream(connection, **kwargs)
00175             self.stop()
00176 
00177         def connect_callback():
00178             streams[1] = client_stream
00179             self.stop()
00180         netutil.add_accept_handler(listener, accept_callback,
00181                                    io_loop=self.io_loop)
00182         client_stream = self._make_client_iostream(socket.socket(), **kwargs)
00183         client_stream.connect(('127.0.0.1', port),
00184                               callback=connect_callback)
00185         self.wait(condition=lambda: all(streams))
00186         self.io_loop.remove_handler(listener.fileno())
00187         listener.close()
00188         return streams
00189 
00190     def test_streaming_callback_with_data_in_buffer(self):
00191         server, client = self.make_iostream_pair()
00192         client.write(b"abcd\r\nefgh")
00193         server.read_until(b"\r\n", self.stop)
00194         data = self.wait()
00195         self.assertEqual(data, b"abcd\r\n")
00196 
00197         def closed_callback(chunk):
00198             self.fail()
00199         server.read_until_close(callback=closed_callback,
00200                                 streaming_callback=self.stop)
00201         # self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
00202         data = self.wait()
00203         self.assertEqual(data, b"efgh")
00204         server.close()
00205         client.close()
00206 
00207     def test_write_zero_bytes(self):
00208         # Attempting to write zero bytes should run the callback without
00209         # going into an infinite loop.
00210         server, client = self.make_iostream_pair()
00211         server.write(b'', callback=self.stop)
00212         self.wait()
00213         server.close()
00214         client.close()
00215 
00216     def test_connection_refused(self):
00217         # When a connection is refused, the connect callback should not
00218         # be run.  (The kqueue IOLoop used to behave differently from the
00219         # epoll IOLoop in this respect)
00220         server_socket, port = bind_unused_port()
00221         server_socket.close()
00222         stream = IOStream(socket.socket(), self.io_loop)
00223         self.connect_called = False
00224 
00225         def connect_callback():
00226             self.connect_called = True
00227         stream.set_close_callback(self.stop)
00228         # log messages vary by platform and ioloop implementation
00229         with ExpectLog(gen_log, ".*", required=False):
00230             stream.connect(("localhost", port), connect_callback)
00231             self.wait()
00232         self.assertFalse(self.connect_called)
00233         self.assertTrue(isinstance(stream.error, socket.error), stream.error)
00234         if sys.platform != 'cygwin':
00235             _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,)
00236             if hasattr(errno, "WSAECONNREFUSED"):
00237                 _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,)
00238             # cygwin's errnos don't match those used on native windows python
00239             self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
00240 
00241     def test_gaierror(self):
00242         # Test that IOStream sets its exc_info on getaddrinfo error
00243         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00244         stream = IOStream(s, io_loop=self.io_loop)
00245         stream.set_close_callback(self.stop)
00246         # To reliably generate a gaierror we use a malformed domain name
00247         # instead of a name that's simply unlikely to exist (since
00248         # opendns and some ISPs return bogus addresses for nonexistent
00249         # domains instead of the proper error codes).
00250         with ExpectLog(gen_log, "Connect error"):
00251             stream.connect(('an invalid domain', 54321))
00252             self.assertTrue(isinstance(stream.error, socket.gaierror), stream.error)
00253 
00254     def test_read_callback_error(self):
00255         # Test that IOStream sets its exc_info when a read callback throws
00256         server, client = self.make_iostream_pair()
00257         try:
00258             server.set_close_callback(self.stop)
00259             with ExpectLog(
00260                 app_log, "(Uncaught exception|Exception in callback)"
00261             ):
00262                 # Clear ExceptionStackContext so IOStream catches error
00263                 with NullContext():
00264                     server.read_bytes(1, callback=lambda data: 1 / 0)
00265                 client.write(b"1")
00266                 self.wait()
00267             self.assertTrue(isinstance(server.error, ZeroDivisionError))
00268         finally:
00269             server.close()
00270             client.close()
00271 
00272     def test_streaming_callback(self):
00273         server, client = self.make_iostream_pair()
00274         try:
00275             chunks = []
00276             final_called = []
00277 
00278             def streaming_callback(data):
00279                 chunks.append(data)
00280                 self.stop()
00281 
00282             def final_callback(data):
00283                 self.assertFalse(data)
00284                 final_called.append(True)
00285                 self.stop()
00286             server.read_bytes(6, callback=final_callback,
00287                               streaming_callback=streaming_callback)
00288             client.write(b"1234")
00289             self.wait(condition=lambda: chunks)
00290             client.write(b"5678")
00291             self.wait(condition=lambda: final_called)
00292             self.assertEqual(chunks, [b"1234", b"56"])
00293 
00294             # the rest of the last chunk is still in the buffer
00295             server.read_bytes(2, callback=self.stop)
00296             data = self.wait()
00297             self.assertEqual(data, b"78")
00298         finally:
00299             server.close()
00300             client.close()
00301 
00302     def test_streaming_until_close(self):
00303         server, client = self.make_iostream_pair()
00304         try:
00305             chunks = []
00306             closed = [False]
00307 
00308             def streaming_callback(data):
00309                 chunks.append(data)
00310                 self.stop()
00311             def close_callback(data):
00312                 assert not data, data
00313                 closed[0] = True
00314                 self.stop()
00315             client.read_until_close(callback=close_callback,
00316                                     streaming_callback=streaming_callback)
00317             server.write(b"1234")
00318             self.wait(condition=lambda: len(chunks) == 1)
00319             server.write(b"5678", self.stop)
00320             self.wait()
00321             server.close()
00322             self.wait(condition=lambda: closed[0])
00323             self.assertEqual(chunks, [b"1234", b"5678"])
00324         finally:
00325             server.close()
00326             client.close()
00327 
00328     def test_delayed_close_callback(self):
00329         # The scenario:  Server closes the connection while there is a pending
00330         # read that can be served out of buffered data.  The client does not
00331         # run the close_callback as soon as it detects the close, but rather
00332         # defers it until after the buffered read has finished.
00333         server, client = self.make_iostream_pair()
00334         try:
00335             client.set_close_callback(self.stop)
00336             server.write(b"12")
00337             chunks = []
00338 
00339             def callback1(data):
00340                 chunks.append(data)
00341                 client.read_bytes(1, callback2)
00342                 server.close()
00343 
00344             def callback2(data):
00345                 chunks.append(data)
00346             client.read_bytes(1, callback1)
00347             self.wait()  # stopped by close_callback
00348             self.assertEqual(chunks, [b"1", b"2"])
00349         finally:
00350             server.close()
00351             client.close()
00352 
00353     def test_future_delayed_close_callback(self):
00354         # Same as test_delayed_close_callback, but with the future interface.
00355         server, client = self.make_iostream_pair()
00356         # We can't call make_iostream_pair inside a gen_test function
00357         # because the ioloop is not reentrant.
00358         @gen_test
00359         def f(self):
00360             server.write(b"12")
00361             chunks = []
00362             chunks.append((yield client.read_bytes(1)))
00363             server.close()
00364             chunks.append((yield client.read_bytes(1)))
00365             self.assertEqual(chunks, [b"1", b"2"])
00366         try:
00367             f(self)
00368         finally:
00369             server.close()
00370             client.close()
00371 
00372     def test_close_buffered_data(self):
00373         # Similar to the previous test, but with data stored in the OS's
00374         # socket buffers instead of the IOStream's read buffer.  Out-of-band
00375         # close notifications must be delayed until all data has been
00376         # drained into the IOStream buffer. (epoll used to use out-of-band
00377         # close events with EPOLLRDHUP, but no longer)
00378         #
00379         # This depends on the read_chunk_size being smaller than the
00380         # OS socket buffer, so make it small.
00381         server, client = self.make_iostream_pair(read_chunk_size=256)
00382         try:
00383             server.write(b"A" * 512)
00384             client.read_bytes(256, self.stop)
00385             data = self.wait()
00386             self.assertEqual(b"A" * 256, data)
00387             server.close()
00388             # Allow the close to propagate to the client side of the
00389             # connection.  Using add_callback instead of add_timeout
00390             # doesn't seem to work, even with multiple iterations
00391             self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
00392             self.wait()
00393             client.read_bytes(256, self.stop)
00394             data = self.wait()
00395             self.assertEqual(b"A" * 256, data)
00396         finally:
00397             server.close()
00398             client.close()
00399 
00400     def test_read_until_close_after_close(self):
00401         # Similar to test_delayed_close_callback, but read_until_close takes
00402         # a separate code path so test it separately.
00403         server, client = self.make_iostream_pair()
00404         try:
00405             server.write(b"1234")
00406             server.close()
00407             # Read one byte to make sure the client has received the data.
00408             # It won't run the close callback as long as there is more buffered
00409             # data that could satisfy a later read.
00410             client.read_bytes(1, self.stop)
00411             data = self.wait()
00412             self.assertEqual(data, b"1")
00413             client.read_until_close(self.stop)
00414             data = self.wait()
00415             self.assertEqual(data, b"234")
00416         finally:
00417             server.close()
00418             client.close()
00419 
00420     def test_streaming_read_until_close_after_close(self):
00421         # Same as the preceding test but with a streaming_callback.
00422         # All data should go through the streaming callback,
00423         # and the final read callback just gets an empty string.
00424         server, client = self.make_iostream_pair()
00425         try:
00426             server.write(b"1234")
00427             server.close()
00428             client.read_bytes(1, self.stop)
00429             data = self.wait()
00430             self.assertEqual(data, b"1")
00431             streaming_data = []
00432             client.read_until_close(self.stop,
00433                                     streaming_callback=streaming_data.append)
00434             data = self.wait()
00435             self.assertEqual(b'', data)
00436             self.assertEqual(b''.join(streaming_data), b"234")
00437         finally:
00438             server.close()
00439             client.close()
00440 
00441     def test_large_read_until(self):
00442         # Performance test: read_until used to have a quadratic component
00443         # so a read_until of 4MB would take 8 seconds; now it takes 0.25
00444         # seconds.
00445         server, client = self.make_iostream_pair()
00446         try:
00447             # This test fails on pypy with ssl.  I think it's because
00448             # pypy's gc defeats moves objects, breaking the
00449             # "frozen write buffer" assumption.
00450             if (isinstance(server, SSLIOStream) and
00451                     platform.python_implementation() == 'PyPy'):
00452                 raise unittest.SkipTest(
00453                     "pypy gc causes problems with openssl")
00454             NUM_KB = 4096
00455             for i in range(NUM_KB):
00456                 client.write(b"A" * 1024)
00457             client.write(b"\r\n")
00458             server.read_until(b"\r\n", self.stop)
00459             data = self.wait()
00460             self.assertEqual(len(data), NUM_KB * 1024 + 2)
00461         finally:
00462             server.close()
00463             client.close()
00464 
00465     def test_close_callback_with_pending_read(self):
00466         # Regression test for a bug that was introduced in 2.3
00467         # where the IOStream._close_callback would never be called
00468         # if there were pending reads.
00469         OK = b"OK\r\n"
00470         server, client = self.make_iostream_pair()
00471         client.set_close_callback(self.stop)
00472         try:
00473             server.write(OK)
00474             client.read_until(b"\r\n", self.stop)
00475             res = self.wait()
00476             self.assertEqual(res, OK)
00477 
00478             server.close()
00479             client.read_until(b"\r\n", lambda x: x)
00480             # If _close_callback (self.stop) is not called,
00481             # an AssertionError: Async operation timed out after 5 seconds
00482             # will be raised.
00483             res = self.wait()
00484             self.assertTrue(res is None)
00485         finally:
00486             server.close()
00487             client.close()
00488 
00489     @skipIfNonUnix
00490     def test_inline_read_error(self):
00491         # An error on an inline read is raised without logging (on the
00492         # assumption that it will eventually be noticed or logged further
00493         # up the stack).
00494         #
00495         # This test is posix-only because windows os.close() doesn't work
00496         # on socket FDs, but we can't close the socket object normally
00497         # because we won't get the error we want if the socket knows
00498         # it's closed.
00499         server, client = self.make_iostream_pair()
00500         try:
00501             os.close(server.socket.fileno())
00502             with self.assertRaises(socket.error):
00503                 server.read_bytes(1, lambda data: None)
00504         finally:
00505             server.close()
00506             client.close()
00507 
00508     def test_async_read_error_logging(self):
00509         # Socket errors on asynchronous reads should be logged (but only
00510         # once).
00511         server, client = self.make_iostream_pair()
00512         server.set_close_callback(self.stop)
00513         try:
00514             # Start a read that will be fullfilled asynchronously.
00515             server.read_bytes(1, lambda data: None)
00516             client.write(b'a')
00517             # Stub out read_from_fd to make it fail.
00518 
00519             def fake_read_from_fd():
00520                 os.close(server.socket.fileno())
00521                 server.__class__.read_from_fd(server)
00522             server.read_from_fd = fake_read_from_fd
00523             # This log message is from _handle_read (not read_from_fd).
00524             with ExpectLog(gen_log, "error on read"):
00525                 self.wait()
00526         finally:
00527             server.close()
00528             client.close()
00529 
00530     def test_future_close_callback(self):
00531         # Regression test for interaction between the Future read interfaces
00532         # and IOStream._maybe_add_error_listener.
00533         server, client = self.make_iostream_pair()
00534         closed = [False]
00535         def close_callback():
00536             closed[0] = True
00537             self.stop()
00538         server.set_close_callback(close_callback)
00539         try:
00540             client.write(b'a')
00541             future = server.read_bytes(1)
00542             self.io_loop.add_future(future, self.stop)
00543             self.assertEqual(self.wait().result(), b'a')
00544             self.assertFalse(closed[0])
00545             client.close()
00546             self.wait()
00547             self.assertTrue(closed[0])
00548         finally:
00549             server.close()
00550             client.close()
00551 
00552     def test_read_bytes_partial(self):
00553         server, client = self.make_iostream_pair()
00554         try:
00555             # Ask for more than is available with partial=True
00556             client.read_bytes(50, self.stop, partial=True)
00557             server.write(b"hello")
00558             data = self.wait()
00559             self.assertEqual(data, b"hello")
00560 
00561             # Ask for less than what is available; num_bytes is still
00562             # respected.
00563             client.read_bytes(3, self.stop, partial=True)
00564             server.write(b"world")
00565             data = self.wait()
00566             self.assertEqual(data, b"wor")
00567 
00568             # Partial reads won't return an empty string, but read_bytes(0)
00569             # will.
00570             client.read_bytes(0, self.stop, partial=True)
00571             data = self.wait()
00572             self.assertEqual(data, b'')
00573         finally:
00574             server.close()
00575             client.close()
00576 
00577     def test_read_until_max_bytes(self):
00578         server, client = self.make_iostream_pair()
00579         client.set_close_callback(lambda: self.stop("closed"))
00580         try:
00581             # Extra room under the limit
00582             client.read_until(b"def", self.stop, max_bytes=50)
00583             server.write(b"abcdef")
00584             data = self.wait()
00585             self.assertEqual(data, b"abcdef")
00586 
00587             # Just enough space
00588             client.read_until(b"def", self.stop, max_bytes=6)
00589             server.write(b"abcdef")
00590             data = self.wait()
00591             self.assertEqual(data, b"abcdef")
00592 
00593             # Not enough space, but we don't know it until all we can do is
00594             # log a warning and close the connection.
00595             with ExpectLog(gen_log, "Unsatisfiable read"):
00596                 client.read_until(b"def", self.stop, max_bytes=5)
00597                 server.write(b"123456")
00598                 data = self.wait()
00599             self.assertEqual(data, "closed")
00600         finally:
00601             server.close()
00602             client.close()
00603 
00604     def test_read_until_max_bytes_inline(self):
00605         server, client = self.make_iostream_pair()
00606         client.set_close_callback(lambda: self.stop("closed"))
00607         try:
00608             # Similar to the error case in the previous test, but the
00609             # server writes first so client reads are satisfied
00610             # inline.  For consistency with the out-of-line case, we
00611             # do not raise the error synchronously.
00612             server.write(b"123456")
00613             with ExpectLog(gen_log, "Unsatisfiable read"):
00614                 client.read_until(b"def", self.stop, max_bytes=5)
00615                 data = self.wait()
00616             self.assertEqual(data, "closed")
00617         finally:
00618             server.close()
00619             client.close()
00620 
00621     def test_read_until_max_bytes_ignores_extra(self):
00622         server, client = self.make_iostream_pair()
00623         client.set_close_callback(lambda: self.stop("closed"))
00624         try:
00625             # Even though data that matches arrives the same packet that
00626             # puts us over the limit, we fail the request because it was not
00627             # found within the limit.
00628             server.write(b"abcdef")
00629             with ExpectLog(gen_log, "Unsatisfiable read"):
00630                 client.read_until(b"def", self.stop, max_bytes=5)
00631                 data = self.wait()
00632             self.assertEqual(data, "closed")
00633         finally:
00634             server.close()
00635             client.close()
00636 
00637     def test_read_until_regex_max_bytes(self):
00638         server, client = self.make_iostream_pair()
00639         client.set_close_callback(lambda: self.stop("closed"))
00640         try:
00641             # Extra room under the limit
00642             client.read_until_regex(b"def", self.stop, max_bytes=50)
00643             server.write(b"abcdef")
00644             data = self.wait()
00645             self.assertEqual(data, b"abcdef")
00646 
00647             # Just enough space
00648             client.read_until_regex(b"def", self.stop, max_bytes=6)
00649             server.write(b"abcdef")
00650             data = self.wait()
00651             self.assertEqual(data, b"abcdef")
00652 
00653             # Not enough space, but we don't know it until all we can do is
00654             # log a warning and close the connection.
00655             with ExpectLog(gen_log, "Unsatisfiable read"):
00656                 client.read_until_regex(b"def", self.stop, max_bytes=5)
00657                 server.write(b"123456")
00658                 data = self.wait()
00659             self.assertEqual(data, "closed")
00660         finally:
00661             server.close()
00662             client.close()
00663 
00664     def test_read_until_regex_max_bytes_inline(self):
00665         server, client = self.make_iostream_pair()
00666         client.set_close_callback(lambda: self.stop("closed"))
00667         try:
00668             # Similar to the error case in the previous test, but the
00669             # server writes first so client reads are satisfied
00670             # inline.  For consistency with the out-of-line case, we
00671             # do not raise the error synchronously.
00672             server.write(b"123456")
00673             with ExpectLog(gen_log, "Unsatisfiable read"):
00674                 client.read_until_regex(b"def", self.stop, max_bytes=5)
00675                 data = self.wait()
00676             self.assertEqual(data, "closed")
00677         finally:
00678             server.close()
00679             client.close()
00680 
00681     def test_read_until_regex_max_bytes_ignores_extra(self):
00682         server, client = self.make_iostream_pair()
00683         client.set_close_callback(lambda: self.stop("closed"))
00684         try:
00685             # Even though data that matches arrives the same packet that
00686             # puts us over the limit, we fail the request because it was not
00687             # found within the limit.
00688             server.write(b"abcdef")
00689             with ExpectLog(gen_log, "Unsatisfiable read"):
00690                 client.read_until_regex(b"def", self.stop, max_bytes=5)
00691                 data = self.wait()
00692             self.assertEqual(data, "closed")
00693         finally:
00694             server.close()
00695             client.close()
00696 
00697     def test_small_reads_from_large_buffer(self):
00698         # 10KB buffer size, 100KB available to read.
00699         # Read 1KB at a time and make sure that the buffer is not eagerly
00700         # filled.
00701         server, client = self.make_iostream_pair(max_buffer_size=10 * 1024)
00702         try:
00703             server.write(b"a" * 1024 * 100)
00704             for i in range(100):
00705                 client.read_bytes(1024, self.stop)
00706                 data = self.wait()
00707                 self.assertEqual(data, b"a" * 1024)
00708         finally:
00709             server.close()
00710             client.close()
00711 
00712     def test_small_read_untils_from_large_buffer(self):
00713         # 10KB buffer size, 100KB available to read.
00714         # Read 1KB at a time and make sure that the buffer is not eagerly
00715         # filled.
00716         server, client = self.make_iostream_pair(max_buffer_size=10 * 1024)
00717         try:
00718             server.write((b"a" * 1023 + b"\n") * 100)
00719             for i in range(100):
00720                 client.read_until(b"\n", self.stop, max_bytes=4096)
00721                 data = self.wait()
00722                 self.assertEqual(data, b"a" * 1023 + b"\n")
00723         finally:
00724             server.close()
00725             client.close()
00726 
00727 
00728 class TestIOStreamWebHTTP(TestIOStreamWebMixin, AsyncHTTPTestCase):
00729     def _make_client_iostream(self):
00730         return IOStream(socket.socket(), io_loop=self.io_loop)
00731 
00732 
00733 class TestIOStreamWebHTTPS(TestIOStreamWebMixin, AsyncHTTPSTestCase):
00734     def _make_client_iostream(self):
00735         return SSLIOStream(socket.socket(), io_loop=self.io_loop)
00736 
00737 
00738 class TestIOStream(TestIOStreamMixin, AsyncTestCase):
00739     def _make_server_iostream(self, connection, **kwargs):
00740         return IOStream(connection, **kwargs)
00741 
00742     def _make_client_iostream(self, connection, **kwargs):
00743         return IOStream(connection, **kwargs)
00744 
00745 
00746 class TestIOStreamSSL(TestIOStreamMixin, AsyncTestCase):
00747     def _make_server_iostream(self, connection, **kwargs):
00748         connection = ssl.wrap_socket(connection,
00749                                      server_side=True,
00750                                      do_handshake_on_connect=False,
00751                                      **_server_ssl_options())
00752         return SSLIOStream(connection, io_loop=self.io_loop, **kwargs)
00753 
00754     def _make_client_iostream(self, connection, **kwargs):
00755         return SSLIOStream(connection, io_loop=self.io_loop, **kwargs)
00756 
00757 
00758 # This will run some tests that are basically redundant but it's the
00759 # simplest way to make sure that it works to pass an SSLContext
00760 # instead of an ssl_options dict to the SSLIOStream constructor.
00761 @unittest.skipIf(not hasattr(ssl, 'SSLContext'), 'ssl.SSLContext not present')
00762 class TestIOStreamSSLContext(TestIOStreamMixin, AsyncTestCase):
00763     def _make_server_iostream(self, connection, **kwargs):
00764         context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
00765         context.load_cert_chain(
00766             os.path.join(os.path.dirname(__file__), 'test.crt'),
00767             os.path.join(os.path.dirname(__file__), 'test.key'))
00768         connection = ssl_wrap_socket(connection, context,
00769                                      server_side=True,
00770                                      do_handshake_on_connect=False)
00771         return SSLIOStream(connection, io_loop=self.io_loop, **kwargs)
00772 
00773     def _make_client_iostream(self, connection, **kwargs):
00774         context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
00775         return SSLIOStream(connection, io_loop=self.io_loop,
00776                            ssl_options=context, **kwargs)
00777 
00778 
00779 class TestIOStreamStartTLS(AsyncTestCase):
00780     def setUp(self):
00781         try:
00782             super(TestIOStreamStartTLS, self).setUp()
00783             self.listener, self.port = bind_unused_port()
00784             self.server_stream = None
00785             self.server_accepted = Future()
00786             netutil.add_accept_handler(self.listener, self.accept)
00787             self.client_stream = IOStream(socket.socket())
00788             self.io_loop.add_future(self.client_stream.connect(
00789                 ('127.0.0.1', self.port)), self.stop)
00790             self.wait()
00791             self.io_loop.add_future(self.server_accepted, self.stop)
00792             self.wait()
00793         except Exception as e:
00794             print(e)
00795             raise
00796 
00797     def tearDown(self):
00798         if self.server_stream is not None:
00799             self.server_stream.close()
00800         if self.client_stream is not None:
00801             self.client_stream.close()
00802         self.listener.close()
00803         super(TestIOStreamStartTLS, self).tearDown()
00804 
00805     def accept(self, connection, address):
00806         if self.server_stream is not None:
00807             self.fail("should only get one connection")
00808         self.server_stream = IOStream(connection)
00809         self.server_accepted.set_result(None)
00810 
00811     @gen.coroutine
00812     def client_send_line(self, line):
00813         self.client_stream.write(line)
00814         recv_line = yield self.server_stream.read_until(b"\r\n")
00815         self.assertEqual(line, recv_line)
00816 
00817     @gen.coroutine
00818     def server_send_line(self, line):
00819         self.server_stream.write(line)
00820         recv_line = yield self.client_stream.read_until(b"\r\n")
00821         self.assertEqual(line, recv_line)
00822 
00823     def client_start_tls(self, ssl_options=None):
00824         client_stream = self.client_stream
00825         self.client_stream = None
00826         return client_stream.start_tls(False, ssl_options)
00827 
00828     def server_start_tls(self, ssl_options=None):
00829         server_stream = self.server_stream
00830         self.server_stream = None
00831         return server_stream.start_tls(True, ssl_options)
00832 
00833     @gen_test
00834     def test_start_tls_smtp(self):
00835         # This flow is simplified from RFC 3207 section 5.
00836         # We don't really need all of this, but it helps to make sure
00837         # that after realistic back-and-forth traffic the buffers end up
00838         # in a sane state.
00839         yield self.server_send_line(b"220 mail.example.com ready\r\n")
00840         yield self.client_send_line(b"EHLO mail.example.com\r\n")
00841         yield self.server_send_line(b"250-mail.example.com welcome\r\n")
00842         yield self.server_send_line(b"250 STARTTLS\r\n")
00843         yield self.client_send_line(b"STARTTLS\r\n")
00844         yield self.server_send_line(b"220 Go ahead\r\n")
00845         client_future = self.client_start_tls()
00846         server_future = self.server_start_tls(_server_ssl_options())
00847         self.client_stream = yield client_future
00848         self.server_stream = yield server_future
00849         self.assertTrue(isinstance(self.client_stream, SSLIOStream))
00850         self.assertTrue(isinstance(self.server_stream, SSLIOStream))
00851         yield self.client_send_line(b"EHLO mail.example.com\r\n")
00852         yield self.server_send_line(b"250 mail.example.com welcome\r\n")
00853 
00854     @gen_test
00855     def test_handshake_fail(self):
00856         self.server_start_tls(_server_ssl_options())
00857         client_future = self.client_start_tls(
00858             dict(cert_reqs=ssl.CERT_REQUIRED, ca_certs=certifi.where()))
00859         with ExpectLog(gen_log, "SSL Error"):
00860             with self.assertRaises(ssl.SSLError):
00861                 yield client_future
00862 
00863 
00864 @skipIfNonUnix
00865 class TestPipeIOStream(AsyncTestCase):
00866     def test_pipe_iostream(self):
00867         r, w = os.pipe()
00868 
00869         rs = PipeIOStream(r, io_loop=self.io_loop)
00870         ws = PipeIOStream(w, io_loop=self.io_loop)
00871 
00872         ws.write(b"hel")
00873         ws.write(b"lo world")
00874 
00875         rs.read_until(b' ', callback=self.stop)
00876         data = self.wait()
00877         self.assertEqual(data, b"hello ")
00878 
00879         rs.read_bytes(3, self.stop)
00880         data = self.wait()
00881         self.assertEqual(data, b"wor")
00882 
00883         ws.close()
00884 
00885         rs.read_until_close(self.stop)
00886         data = self.wait()
00887         self.assertEqual(data, b"ld")
00888 
00889         rs.close()
00890 
00891     def test_pipe_iostream_big_write(self):
00892         r, w = os.pipe()
00893 
00894         rs = PipeIOStream(r, io_loop=self.io_loop)
00895         ws = PipeIOStream(w, io_loop=self.io_loop)
00896 
00897         NUM_BYTES = 1048576
00898 
00899         # Write 1MB of data, which should fill the buffer
00900         ws.write(b"1" * NUM_BYTES)
00901 
00902         rs.read_bytes(NUM_BYTES, self.stop)
00903         data = self.wait()
00904         self.assertEqual(data, b"1" * NUM_BYTES)
00905 
00906         ws.close()
00907         rs.close()


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:50