httpserver_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 
00004 from __future__ import absolute_import, division, print_function, with_statement
00005 from tornado import netutil
00006 from tornado.escape import json_decode, json_encode, utf8, _unicode, recursive_unicode, native_str
00007 from tornado import gen
00008 from tornado.http1connection import HTTP1Connection
00009 from tornado.httpserver import HTTPServer
00010 from tornado.httputil import HTTPHeaders, HTTPMessageDelegate, HTTPServerConnectionDelegate, ResponseStartLine
00011 from tornado.iostream import IOStream
00012 from tornado.log import gen_log, app_log
00013 from tornado.netutil import ssl_options_to_context
00014 from tornado.simple_httpclient import SimpleAsyncHTTPClient
00015 from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, ExpectLog, gen_test
00016 from tornado.test.util import unittest, skipOnTravis
00017 from tornado.util import u, bytes_type
00018 from tornado.web import Application, RequestHandler, asynchronous, stream_request_body
00019 from contextlib import closing
00020 import datetime
00021 import gzip
00022 import os
00023 import shutil
00024 import socket
00025 import ssl
00026 import sys
00027 import tempfile
00028 
00029 try:
00030     from io import BytesIO  # python 3
00031 except ImportError:
00032     from cStringIO import StringIO as BytesIO  # python 2
00033 
00034 
00035 def read_stream_body(stream, callback):
00036     """Reads an HTTP response from `stream` and runs callback with its
00037     headers and body."""
00038     chunks = []
00039     class Delegate(HTTPMessageDelegate):
00040         def headers_received(self, start_line, headers):
00041             self.headers = headers
00042 
00043         def data_received(self, chunk):
00044             chunks.append(chunk)
00045 
00046         def finish(self):
00047             callback((self.headers, b''.join(chunks)))
00048     conn = HTTP1Connection(stream, True)
00049     conn.read_response(Delegate())
00050 
00051 
00052 class HandlerBaseTestCase(AsyncHTTPTestCase):
00053     def get_app(self):
00054         return Application([('/', self.__class__.Handler)])
00055 
00056     def fetch_json(self, *args, **kwargs):
00057         response = self.fetch(*args, **kwargs)
00058         response.rethrow()
00059         return json_decode(response.body)
00060 
00061 
00062 class HelloWorldRequestHandler(RequestHandler):
00063     def initialize(self, protocol="http"):
00064         self.expected_protocol = protocol
00065 
00066     def get(self):
00067         if self.request.protocol != self.expected_protocol:
00068             raise Exception("unexpected protocol")
00069         self.finish("Hello world")
00070 
00071     def post(self):
00072         self.finish("Got %d bytes in POST" % len(self.request.body))
00073 
00074 
00075 # In pre-1.0 versions of openssl, SSLv23 clients always send SSLv2
00076 # ClientHello messages, which are rejected by SSLv3 and TLSv1
00077 # servers.  Note that while the OPENSSL_VERSION_INFO was formally
00078 # introduced in python3.2, it was present but undocumented in
00079 # python 2.7
00080 skipIfOldSSL = unittest.skipIf(
00081     getattr(ssl, 'OPENSSL_VERSION_INFO', (0, 0)) < (1, 0),
00082     "old version of ssl module and/or openssl")
00083 
00084 
00085 class BaseSSLTest(AsyncHTTPSTestCase):
00086     def get_app(self):
00087         return Application([('/', HelloWorldRequestHandler,
00088                              dict(protocol="https"))])
00089 
00090 
00091 class SSLTestMixin(object):
00092     def get_ssl_options(self):
00093         return dict(ssl_version=self.get_ssl_version(),
00094                     **AsyncHTTPSTestCase.get_ssl_options())
00095 
00096     def get_ssl_version(self):
00097         raise NotImplementedError()
00098 
00099     def test_ssl(self):
00100         response = self.fetch('/')
00101         self.assertEqual(response.body, b"Hello world")
00102 
00103     def test_large_post(self):
00104         response = self.fetch('/',
00105                               method='POST',
00106                               body='A' * 5000)
00107         self.assertEqual(response.body, b"Got 5000 bytes in POST")
00108 
00109     def test_non_ssl_request(self):
00110         # Make sure the server closes the connection when it gets a non-ssl
00111         # connection, rather than waiting for a timeout or otherwise
00112         # misbehaving.
00113         with ExpectLog(gen_log, '(SSL Error|uncaught exception)'):
00114             with ExpectLog(gen_log, 'Uncaught exception', required=False):
00115                 self.http_client.fetch(
00116                     self.get_url("/").replace('https:', 'http:'),
00117                     self.stop,
00118                     request_timeout=3600,
00119                     connect_timeout=3600)
00120                 response = self.wait()
00121         self.assertEqual(response.code, 599)
00122 
00123 # Python's SSL implementation differs significantly between versions.
00124 # For example, SSLv3 and TLSv1 throw an exception if you try to read
00125 # from the socket before the handshake is complete, but the default
00126 # of SSLv23 allows it.
00127 
00128 
00129 class SSLv23Test(BaseSSLTest, SSLTestMixin):
00130     def get_ssl_version(self):
00131         return ssl.PROTOCOL_SSLv23
00132 
00133 
00134 @skipIfOldSSL
00135 class SSLv3Test(BaseSSLTest, SSLTestMixin):
00136     def get_ssl_version(self):
00137         return ssl.PROTOCOL_SSLv3
00138 
00139 
00140 @skipIfOldSSL
00141 class TLSv1Test(BaseSSLTest, SSLTestMixin):
00142     def get_ssl_version(self):
00143         return ssl.PROTOCOL_TLSv1
00144 
00145 
00146 @unittest.skipIf(not hasattr(ssl, 'SSLContext'), 'ssl.SSLContext not present')
00147 class SSLContextTest(BaseSSLTest, SSLTestMixin):
00148     def get_ssl_options(self):
00149         context = ssl_options_to_context(
00150             AsyncHTTPSTestCase.get_ssl_options(self))
00151         assert isinstance(context, ssl.SSLContext)
00152         return context
00153 
00154 
00155 class BadSSLOptionsTest(unittest.TestCase):
00156     def test_missing_arguments(self):
00157         application = Application()
00158         self.assertRaises(KeyError, HTTPServer, application, ssl_options={
00159             "keyfile": "/__missing__.crt",
00160         })
00161 
00162     def test_missing_key(self):
00163         """A missing SSL key should cause an immediate exception."""
00164 
00165         application = Application()
00166         module_dir = os.path.dirname(__file__)
00167         existing_certificate = os.path.join(module_dir, 'test.crt')
00168 
00169         self.assertRaises(ValueError, HTTPServer, application, ssl_options={
00170                           "certfile": "/__mising__.crt",
00171                           })
00172         self.assertRaises(ValueError, HTTPServer, application, ssl_options={
00173                           "certfile": existing_certificate,
00174                           "keyfile": "/__missing__.key"
00175                           })
00176 
00177         # This actually works because both files exist
00178         HTTPServer(application, ssl_options={
00179                    "certfile": existing_certificate,
00180                    "keyfile": existing_certificate
00181                    })
00182 
00183 
00184 class MultipartTestHandler(RequestHandler):
00185     def post(self):
00186         self.finish({"header": self.request.headers["X-Header-Encoding-Test"],
00187                      "argument": self.get_argument("argument"),
00188                      "filename": self.request.files["files"][0].filename,
00189                      "filebody": _unicode(self.request.files["files"][0]["body"]),
00190                      })
00191 
00192 
00193 # This test is also called from wsgi_test
00194 class HTTPConnectionTest(AsyncHTTPTestCase):
00195     def get_handlers(self):
00196         return [("/multipart", MultipartTestHandler),
00197                 ("/hello", HelloWorldRequestHandler)]
00198 
00199     def get_app(self):
00200         return Application(self.get_handlers())
00201 
00202     def raw_fetch(self, headers, body):
00203         with closing(IOStream(socket.socket())) as stream:
00204             stream.connect(('127.0.0.1', self.get_http_port()), self.stop)
00205             self.wait()
00206             stream.write(
00207                 b"\r\n".join(headers +
00208                              [utf8("Content-Length: %d\r\n" % len(body))]) +
00209                 b"\r\n" + body)
00210             read_stream_body(stream, self.stop)
00211             headers, body = self.wait()
00212             return body
00213 
00214     def test_multipart_form(self):
00215         # Encodings here are tricky:  Headers are latin1, bodies can be
00216         # anything (we use utf8 by default).
00217         response = self.raw_fetch([
00218             b"POST /multipart HTTP/1.0",
00219             b"Content-Type: multipart/form-data; boundary=1234567890",
00220             b"X-Header-encoding-test: \xe9",
00221         ],
00222             b"\r\n".join([
00223                 b"Content-Disposition: form-data; name=argument",
00224                 b"",
00225                 u("\u00e1").encode("utf-8"),
00226                 b"--1234567890",
00227                 u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
00228                 b"",
00229                 u("\u00fa").encode("utf-8"),
00230                 b"--1234567890--",
00231                 b"",
00232             ]))
00233         data = json_decode(response)
00234         self.assertEqual(u("\u00e9"), data["header"])
00235         self.assertEqual(u("\u00e1"), data["argument"])
00236         self.assertEqual(u("\u00f3"), data["filename"])
00237         self.assertEqual(u("\u00fa"), data["filebody"])
00238 
00239     def test_100_continue(self):
00240         # Run through a 100-continue interaction by hand:
00241         # When given Expect: 100-continue, we get a 100 response after the
00242         # headers, and then the real response after the body.
00243         stream = IOStream(socket.socket(), io_loop=self.io_loop)
00244         stream.connect(("localhost", self.get_http_port()), callback=self.stop)
00245         self.wait()
00246         stream.write(b"\r\n".join([b"POST /hello HTTP/1.1",
00247                                    b"Content-Length: 1024",
00248                                    b"Expect: 100-continue",
00249                                    b"Connection: close",
00250                                    b"\r\n"]), callback=self.stop)
00251         self.wait()
00252         stream.read_until(b"\r\n\r\n", self.stop)
00253         data = self.wait()
00254         self.assertTrue(data.startswith(b"HTTP/1.1 100 "), data)
00255         stream.write(b"a" * 1024)
00256         stream.read_until(b"\r\n", self.stop)
00257         first_line = self.wait()
00258         self.assertTrue(first_line.startswith(b"HTTP/1.1 200"), first_line)
00259         stream.read_until(b"\r\n\r\n", self.stop)
00260         header_data = self.wait()
00261         headers = HTTPHeaders.parse(native_str(header_data.decode('latin1')))
00262         stream.read_bytes(int(headers["Content-Length"]), self.stop)
00263         body = self.wait()
00264         self.assertEqual(body, b"Got 1024 bytes in POST")
00265         stream.close()
00266 
00267 
00268 class EchoHandler(RequestHandler):
00269     def get(self):
00270         self.write(recursive_unicode(self.request.arguments))
00271 
00272     def post(self):
00273         self.write(recursive_unicode(self.request.arguments))
00274 
00275 
00276 class TypeCheckHandler(RequestHandler):
00277     def prepare(self):
00278         self.errors = {}
00279         fields = [
00280             ('method', str),
00281             ('uri', str),
00282             ('version', str),
00283             ('remote_ip', str),
00284             ('protocol', str),
00285             ('host', str),
00286             ('path', str),
00287             ('query', str),
00288         ]
00289         for field, expected_type in fields:
00290             self.check_type(field, getattr(self.request, field), expected_type)
00291 
00292         self.check_type('header_key', list(self.request.headers.keys())[0], str)
00293         self.check_type('header_value', list(self.request.headers.values())[0], str)
00294 
00295         self.check_type('cookie_key', list(self.request.cookies.keys())[0], str)
00296         self.check_type('cookie_value', list(self.request.cookies.values())[0].value, str)
00297         # secure cookies
00298 
00299         self.check_type('arg_key', list(self.request.arguments.keys())[0], str)
00300         self.check_type('arg_value', list(self.request.arguments.values())[0][0], bytes_type)
00301 
00302     def post(self):
00303         self.check_type('body', self.request.body, bytes_type)
00304         self.write(self.errors)
00305 
00306     def get(self):
00307         self.write(self.errors)
00308 
00309     def check_type(self, name, obj, expected_type):
00310         actual_type = type(obj)
00311         if expected_type != actual_type:
00312             self.errors[name] = "expected %s, got %s" % (expected_type,
00313                                                          actual_type)
00314 
00315 
00316 class HTTPServerTest(AsyncHTTPTestCase):
00317     def get_app(self):
00318         return Application([("/echo", EchoHandler),
00319                             ("/typecheck", TypeCheckHandler),
00320                             ("//doubleslash", EchoHandler),
00321                             ])
00322 
00323     def test_query_string_encoding(self):
00324         response = self.fetch("/echo?foo=%C3%A9")
00325         data = json_decode(response.body)
00326         self.assertEqual(data, {u("foo"): [u("\u00e9")]})
00327 
00328     def test_empty_query_string(self):
00329         response = self.fetch("/echo?foo=&foo=")
00330         data = json_decode(response.body)
00331         self.assertEqual(data, {u("foo"): [u(""), u("")]})
00332 
00333     def test_empty_post_parameters(self):
00334         response = self.fetch("/echo", method="POST", body="foo=&bar=")
00335         data = json_decode(response.body)
00336         self.assertEqual(data, {u("foo"): [u("")], u("bar"): [u("")]})
00337 
00338     def test_types(self):
00339         headers = {"Cookie": "foo=bar"}
00340         response = self.fetch("/typecheck?foo=bar", headers=headers)
00341         data = json_decode(response.body)
00342         self.assertEqual(data, {})
00343 
00344         response = self.fetch("/typecheck", method="POST", body="foo=bar", headers=headers)
00345         data = json_decode(response.body)
00346         self.assertEqual(data, {})
00347 
00348     def test_double_slash(self):
00349         # urlparse.urlsplit (which tornado.httpserver used to use
00350         # incorrectly) would parse paths beginning with "//" as
00351         # protocol-relative urls.
00352         response = self.fetch("//doubleslash")
00353         self.assertEqual(200, response.code)
00354         self.assertEqual(json_decode(response.body), {})
00355 
00356     def test_malformed_body(self):
00357         # parse_qs is pretty forgiving, but it will fail on python 3
00358         # if the data is not utf8.  On python 2 parse_qs will work,
00359         # but then the recursive_unicode call in EchoHandler will
00360         # fail.
00361         if str is bytes_type:
00362             return
00363         with ExpectLog(gen_log, 'Invalid x-www-form-urlencoded body'):
00364             response = self.fetch(
00365                 '/echo', method="POST",
00366                 headers={'Content-Type': 'application/x-www-form-urlencoded'},
00367                 body=b'\xe9')
00368         self.assertEqual(200, response.code)
00369         self.assertEqual(b'{}', response.body)
00370 
00371 
00372 class HTTPServerRawTest(AsyncHTTPTestCase):
00373     def get_app(self):
00374         return Application([
00375             ('/echo', EchoHandler),
00376         ])
00377 
00378     def setUp(self):
00379         super(HTTPServerRawTest, self).setUp()
00380         self.stream = IOStream(socket.socket())
00381         self.stream.connect(('localhost', self.get_http_port()), self.stop)
00382         self.wait()
00383 
00384     def tearDown(self):
00385         self.stream.close()
00386         super(HTTPServerRawTest, self).tearDown()
00387 
00388     def test_empty_request(self):
00389         self.stream.close()
00390         self.io_loop.add_timeout(datetime.timedelta(seconds=0.001), self.stop)
00391         self.wait()
00392 
00393     def test_malformed_first_line(self):
00394         with ExpectLog(gen_log, '.*Malformed HTTP request line'):
00395             self.stream.write(b'asdf\r\n\r\n')
00396             # TODO: need an async version of ExpectLog so we don't need
00397             # hard-coded timeouts here.
00398             self.io_loop.add_timeout(datetime.timedelta(seconds=0.01),
00399                                      self.stop)
00400             self.wait()
00401 
00402     def test_malformed_headers(self):
00403         with ExpectLog(gen_log, '.*Malformed HTTP headers'):
00404             self.stream.write(b'GET / HTTP/1.0\r\nasdf\r\n\r\n')
00405             self.io_loop.add_timeout(datetime.timedelta(seconds=0.01),
00406                                      self.stop)
00407             self.wait()
00408 
00409     def test_chunked_request_body(self):
00410         # Chunked requests are not widely supported and we don't have a way
00411         # to generate them in AsyncHTTPClient, but HTTPServer will read them.
00412         self.stream.write(b"""\
00413 POST /echo HTTP/1.1
00414 Transfer-Encoding: chunked
00415 Content-Type: application/x-www-form-urlencoded
00416 
00417 4
00418 foo=
00419 3
00420 bar
00421 0
00422 
00423 """.replace(b"\n", b"\r\n"))
00424         read_stream_body(self.stream, self.stop)
00425         headers, response = self.wait()
00426         self.assertEqual(json_decode(response), {u('foo'): [u('bar')]})
00427 
00428 
00429 class XHeaderTest(HandlerBaseTestCase):
00430     class Handler(RequestHandler):
00431         def get(self):
00432             self.write(dict(remote_ip=self.request.remote_ip,
00433                             remote_protocol=self.request.protocol))
00434 
00435     def get_httpserver_options(self):
00436         return dict(xheaders=True)
00437 
00438     def test_ip_headers(self):
00439         self.assertEqual(self.fetch_json("/")["remote_ip"], "127.0.0.1")
00440 
00441         valid_ipv4 = {"X-Real-IP": "4.4.4.4"}
00442         self.assertEqual(
00443             self.fetch_json("/", headers=valid_ipv4)["remote_ip"],
00444             "4.4.4.4")
00445 
00446         valid_ipv4_list = {"X-Forwarded-For": "127.0.0.1, 4.4.4.4"}
00447         self.assertEqual(
00448             self.fetch_json("/", headers=valid_ipv4_list)["remote_ip"],
00449             "4.4.4.4")
00450 
00451         valid_ipv6 = {"X-Real-IP": "2620:0:1cfe:face:b00c::3"}
00452         self.assertEqual(
00453             self.fetch_json("/", headers=valid_ipv6)["remote_ip"],
00454             "2620:0:1cfe:face:b00c::3")
00455 
00456         valid_ipv6_list = {"X-Forwarded-For": "::1, 2620:0:1cfe:face:b00c::3"}
00457         self.assertEqual(
00458             self.fetch_json("/", headers=valid_ipv6_list)["remote_ip"],
00459             "2620:0:1cfe:face:b00c::3")
00460 
00461         invalid_chars = {"X-Real-IP": "4.4.4.4<script>"}
00462         self.assertEqual(
00463             self.fetch_json("/", headers=invalid_chars)["remote_ip"],
00464             "127.0.0.1")
00465 
00466         invalid_chars_list = {"X-Forwarded-For": "4.4.4.4, 5.5.5.5<script>"}
00467         self.assertEqual(
00468             self.fetch_json("/", headers=invalid_chars_list)["remote_ip"],
00469             "127.0.0.1")
00470 
00471         invalid_host = {"X-Real-IP": "www.google.com"}
00472         self.assertEqual(
00473             self.fetch_json("/", headers=invalid_host)["remote_ip"],
00474             "127.0.0.1")
00475 
00476     def test_scheme_headers(self):
00477         self.assertEqual(self.fetch_json("/")["remote_protocol"], "http")
00478 
00479         https_scheme = {"X-Scheme": "https"}
00480         self.assertEqual(
00481             self.fetch_json("/", headers=https_scheme)["remote_protocol"],
00482             "https")
00483 
00484         https_forwarded = {"X-Forwarded-Proto": "https"}
00485         self.assertEqual(
00486             self.fetch_json("/", headers=https_forwarded)["remote_protocol"],
00487             "https")
00488 
00489         bad_forwarded = {"X-Forwarded-Proto": "unknown"}
00490         self.assertEqual(
00491             self.fetch_json("/", headers=bad_forwarded)["remote_protocol"],
00492             "http")
00493 
00494 
00495 class SSLXHeaderTest(AsyncHTTPSTestCase, HandlerBaseTestCase):
00496     def get_app(self):
00497         return Application([('/', XHeaderTest.Handler)])
00498 
00499     def get_httpserver_options(self):
00500         output = super(SSLXHeaderTest, self).get_httpserver_options()
00501         output['xheaders'] = True
00502         return output
00503 
00504     def test_request_without_xprotocol(self):
00505         self.assertEqual(self.fetch_json("/")["remote_protocol"], "https")
00506 
00507         http_scheme = {"X-Scheme": "http"}
00508         self.assertEqual(
00509             self.fetch_json("/", headers=http_scheme)["remote_protocol"], "http")
00510 
00511         bad_scheme = {"X-Scheme": "unknown"}
00512         self.assertEqual(
00513             self.fetch_json("/", headers=bad_scheme)["remote_protocol"], "https")
00514 
00515 
00516 class ManualProtocolTest(HandlerBaseTestCase):
00517     class Handler(RequestHandler):
00518         def get(self):
00519             self.write(dict(protocol=self.request.protocol))
00520 
00521     def get_httpserver_options(self):
00522         return dict(protocol='https')
00523 
00524     def test_manual_protocol(self):
00525         self.assertEqual(self.fetch_json('/')['protocol'], 'https')
00526 
00527 
00528 @unittest.skipIf(not hasattr(socket, 'AF_UNIX') or sys.platform == 'cygwin',
00529                  "unix sockets not supported on this platform")
00530 class UnixSocketTest(AsyncTestCase):
00531     """HTTPServers can listen on Unix sockets too.
00532 
00533     Why would you want to do this?  Nginx can proxy to backends listening
00534     on unix sockets, for one thing (and managing a namespace for unix
00535     sockets can be easier than managing a bunch of TCP port numbers).
00536 
00537     Unfortunately, there's no way to specify a unix socket in a url for
00538     an HTTP client, so we have to test this by hand.
00539     """
00540     def setUp(self):
00541         super(UnixSocketTest, self).setUp()
00542         self.tmpdir = tempfile.mkdtemp()
00543         self.sockfile = os.path.join(self.tmpdir, "test.sock")
00544         sock = netutil.bind_unix_socket(self.sockfile)
00545         app = Application([("/hello", HelloWorldRequestHandler)])
00546         self.server = HTTPServer(app, io_loop=self.io_loop)
00547         self.server.add_socket(sock)
00548         self.stream = IOStream(socket.socket(socket.AF_UNIX), io_loop=self.io_loop)
00549         self.stream.connect(self.sockfile, self.stop)
00550         self.wait()
00551 
00552     def tearDown(self):
00553         self.stream.close()
00554         self.server.stop()
00555         shutil.rmtree(self.tmpdir)
00556         super(UnixSocketTest, self).tearDown()
00557 
00558     def test_unix_socket(self):
00559         self.stream.write(b"GET /hello HTTP/1.0\r\n\r\n")
00560         self.stream.read_until(b"\r\n", self.stop)
00561         response = self.wait()
00562         self.assertEqual(response, b"HTTP/1.0 200 OK\r\n")
00563         self.stream.read_until(b"\r\n\r\n", self.stop)
00564         headers = HTTPHeaders.parse(self.wait().decode('latin1'))
00565         self.stream.read_bytes(int(headers["Content-Length"]), self.stop)
00566         body = self.wait()
00567         self.assertEqual(body, b"Hello world")
00568 
00569     def test_unix_socket_bad_request(self):
00570         # Unix sockets don't have remote addresses so they just return an
00571         # empty string.
00572         with ExpectLog(gen_log, "Malformed HTTP message from"):
00573             self.stream.write(b"garbage\r\n\r\n")
00574             self.stream.read_until_close(self.stop)
00575             response = self.wait()
00576         self.assertEqual(response, b"")
00577 
00578 
00579 class KeepAliveTest(AsyncHTTPTestCase):
00580     """Tests various scenarios for HTTP 1.1 keep-alive support.
00581 
00582     These tests don't use AsyncHTTPClient because we want to control
00583     connection reuse and closing.
00584     """
00585     def get_app(self):
00586         class HelloHandler(RequestHandler):
00587             def get(self):
00588                 self.finish('Hello world')
00589 
00590         class LargeHandler(RequestHandler):
00591             def get(self):
00592                 # 512KB should be bigger than the socket buffers so it will
00593                 # be written out in chunks.
00594                 self.write(''.join(chr(i % 256) * 1024 for i in range(512)))
00595 
00596         class FinishOnCloseHandler(RequestHandler):
00597             @asynchronous
00598             def get(self):
00599                 self.flush()
00600 
00601             def on_connection_close(self):
00602                 # This is not very realistic, but finishing the request
00603                 # from the close callback has the right timing to mimic
00604                 # some errors seen in the wild.
00605                 self.finish('closed')
00606 
00607         return Application([('/', HelloHandler),
00608                             ('/large', LargeHandler),
00609                             ('/finish_on_close', FinishOnCloseHandler)])
00610 
00611     def setUp(self):
00612         super(KeepAliveTest, self).setUp()
00613         self.http_version = b'HTTP/1.1'
00614 
00615     def tearDown(self):
00616         # We just closed the client side of the socket; let the IOLoop run
00617         # once to make sure the server side got the message.
00618         self.io_loop.add_timeout(datetime.timedelta(seconds=0.001), self.stop)
00619         self.wait()
00620 
00621         if hasattr(self, 'stream'):
00622             self.stream.close()
00623         super(KeepAliveTest, self).tearDown()
00624 
00625     # The next few methods are a crude manual http client
00626     def connect(self):
00627         self.stream = IOStream(socket.socket(), io_loop=self.io_loop)
00628         self.stream.connect(('localhost', self.get_http_port()), self.stop)
00629         self.wait()
00630 
00631     def read_headers(self):
00632         self.stream.read_until(b'\r\n', self.stop)
00633         first_line = self.wait()
00634         self.assertTrue(first_line.startswith(self.http_version + b' 200'), first_line)
00635         self.stream.read_until(b'\r\n\r\n', self.stop)
00636         header_bytes = self.wait()
00637         headers = HTTPHeaders.parse(header_bytes.decode('latin1'))
00638         return headers
00639 
00640     def read_response(self):
00641         self.headers = self.read_headers()
00642         self.stream.read_bytes(int(self.headers['Content-Length']), self.stop)
00643         body = self.wait()
00644         self.assertEqual(b'Hello world', body)
00645 
00646     def close(self):
00647         self.stream.close()
00648         del self.stream
00649 
00650     def test_two_requests(self):
00651         self.connect()
00652         self.stream.write(b'GET / HTTP/1.1\r\n\r\n')
00653         self.read_response()
00654         self.stream.write(b'GET / HTTP/1.1\r\n\r\n')
00655         self.read_response()
00656         self.close()
00657 
00658     def test_request_close(self):
00659         self.connect()
00660         self.stream.write(b'GET / HTTP/1.1\r\nConnection: close\r\n\r\n')
00661         self.read_response()
00662         self.stream.read_until_close(callback=self.stop)
00663         data = self.wait()
00664         self.assertTrue(not data)
00665         self.close()
00666 
00667     # keepalive is supported for http 1.0 too, but it's opt-in
00668     def test_http10(self):
00669         self.http_version = b'HTTP/1.0'
00670         self.connect()
00671         self.stream.write(b'GET / HTTP/1.0\r\n\r\n')
00672         self.read_response()
00673         self.stream.read_until_close(callback=self.stop)
00674         data = self.wait()
00675         self.assertTrue(not data)
00676         self.assertTrue('Connection' not in self.headers)
00677         self.close()
00678 
00679     def test_http10_keepalive(self):
00680         self.http_version = b'HTTP/1.0'
00681         self.connect()
00682         self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')
00683         self.read_response()
00684         self.assertEqual(self.headers['Connection'], 'Keep-Alive')
00685         self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')
00686         self.read_response()
00687         self.assertEqual(self.headers['Connection'], 'Keep-Alive')
00688         self.close()
00689 
00690     def test_pipelined_requests(self):
00691         self.connect()
00692         self.stream.write(b'GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n')
00693         self.read_response()
00694         self.read_response()
00695         self.close()
00696 
00697     def test_pipelined_cancel(self):
00698         self.connect()
00699         self.stream.write(b'GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n')
00700         # only read once
00701         self.read_response()
00702         self.close()
00703 
00704     def test_cancel_during_download(self):
00705         self.connect()
00706         self.stream.write(b'GET /large HTTP/1.1\r\n\r\n')
00707         self.read_headers()
00708         self.stream.read_bytes(1024, self.stop)
00709         self.wait()
00710         self.close()
00711 
00712     def test_finish_while_closed(self):
00713         self.connect()
00714         self.stream.write(b'GET /finish_on_close HTTP/1.1\r\n\r\n')
00715         self.read_headers()
00716         self.close()
00717 
00718 
00719 class GzipBaseTest(object):
00720     def get_app(self):
00721         return Application([('/', EchoHandler)])
00722 
00723     def post_gzip(self, body):
00724         bytesio = BytesIO()
00725         gzip_file = gzip.GzipFile(mode='w', fileobj=bytesio)
00726         gzip_file.write(utf8(body))
00727         gzip_file.close()
00728         compressed_body = bytesio.getvalue()
00729         return self.fetch('/', method='POST', body=compressed_body,
00730                           headers={'Content-Encoding': 'gzip'})
00731 
00732     def test_uncompressed(self):
00733         response = self.fetch('/', method='POST', body='foo=bar')
00734         self.assertEquals(json_decode(response.body), {u('foo'): [u('bar')]})
00735 
00736 
00737 class GzipTest(GzipBaseTest, AsyncHTTPTestCase):
00738     def get_httpserver_options(self):
00739         return dict(decompress_request=True)
00740 
00741     def test_gzip(self):
00742         response = self.post_gzip('foo=bar')
00743         self.assertEquals(json_decode(response.body), {u('foo'): [u('bar')]})
00744 
00745 
00746 class GzipUnsupportedTest(GzipBaseTest, AsyncHTTPTestCase):
00747     def test_gzip_unsupported(self):
00748         # Gzip support is opt-in; without it the server fails to parse
00749         # the body (but parsing form bodies is currently just a log message,
00750         # not a fatal error).
00751         with ExpectLog(gen_log, "Unsupported Content-Encoding"):
00752             response = self.post_gzip('foo=bar')
00753         self.assertEquals(json_decode(response.body), {})
00754 
00755 
00756 class StreamingChunkSizeTest(AsyncHTTPTestCase):
00757     # 50 characters long, and repetitive so it can be compressed.
00758     BODY = b'01234567890123456789012345678901234567890123456789'
00759     CHUNK_SIZE = 16
00760 
00761     def get_http_client(self):
00762         # body_producer doesn't work on curl_httpclient, so override the
00763         # configured AsyncHTTPClient implementation.
00764         return SimpleAsyncHTTPClient(io_loop=self.io_loop)
00765 
00766     def get_httpserver_options(self):
00767         return dict(chunk_size=self.CHUNK_SIZE, decompress_request=True)
00768 
00769     class MessageDelegate(HTTPMessageDelegate):
00770         def __init__(self, connection):
00771             self.connection = connection
00772 
00773         def headers_received(self, start_line, headers):
00774             self.chunk_lengths = []
00775 
00776         def data_received(self, chunk):
00777             self.chunk_lengths.append(len(chunk))
00778 
00779         def finish(self):
00780             response_body = utf8(json_encode(self.chunk_lengths))
00781             self.connection.write_headers(
00782                 ResponseStartLine('HTTP/1.1', 200, 'OK'),
00783                 HTTPHeaders({'Content-Length': str(len(response_body))}))
00784             self.connection.write(response_body)
00785             self.connection.finish()
00786 
00787     def get_app(self):
00788         class App(HTTPServerConnectionDelegate):
00789             def start_request(self, connection):
00790                 return StreamingChunkSizeTest.MessageDelegate(connection)
00791         return App()
00792 
00793     def fetch_chunk_sizes(self, **kwargs):
00794         response = self.fetch('/', method='POST', **kwargs)
00795         response.rethrow()
00796         chunks = json_decode(response.body)
00797         self.assertEqual(len(self.BODY), sum(chunks))
00798         for chunk_size in chunks:
00799             self.assertLessEqual(chunk_size, self.CHUNK_SIZE,
00800                                  'oversized chunk: ' + str(chunks))
00801             self.assertGreater(chunk_size, 0,
00802                                'empty chunk: ' + str(chunks))
00803         return chunks
00804 
00805     def compress(self, body):
00806         bytesio = BytesIO()
00807         gzfile = gzip.GzipFile(mode='w', fileobj=bytesio)
00808         gzfile.write(body)
00809         gzfile.close()
00810         compressed = bytesio.getvalue()
00811         if len(compressed) >= len(body):
00812             raise Exception("body did not shrink when compressed")
00813         return compressed
00814 
00815     def test_regular_body(self):
00816         chunks = self.fetch_chunk_sizes(body=self.BODY)
00817         # Without compression we know exactly what to expect.
00818         self.assertEqual([16, 16, 16, 2], chunks)
00819 
00820     def test_compressed_body(self):
00821         self.fetch_chunk_sizes(body=self.compress(self.BODY),
00822                                headers={'Content-Encoding': 'gzip'})
00823         # Compression creates irregular boundaries so the assertions
00824         # in fetch_chunk_sizes are as specific as we can get.
00825 
00826     def test_chunked_body(self):
00827         def body_producer(write):
00828             write(self.BODY[:20])
00829             write(self.BODY[20:])
00830         chunks = self.fetch_chunk_sizes(body_producer=body_producer)
00831         # HTTP chunk boundaries translate to application-visible breaks
00832         self.assertEqual([16, 4, 16, 14], chunks)
00833 
00834     def test_chunked_compressed(self):
00835         compressed = self.compress(self.BODY)
00836         self.assertGreater(len(compressed), 20)
00837         def body_producer(write):
00838             write(compressed[:20])
00839             write(compressed[20:])
00840         self.fetch_chunk_sizes(body_producer=body_producer,
00841                                headers={'Content-Encoding': 'gzip'})
00842 
00843 
00844 class MaxHeaderSizeTest(AsyncHTTPTestCase):
00845     def get_app(self):
00846         return Application([('/', HelloWorldRequestHandler)])
00847 
00848     def get_httpserver_options(self):
00849         return dict(max_header_size=1024)
00850 
00851     def test_small_headers(self):
00852         response = self.fetch("/", headers={'X-Filler': 'a' * 100})
00853         response.rethrow()
00854         self.assertEqual(response.body, b"Hello world")
00855 
00856     def test_large_headers(self):
00857         with ExpectLog(gen_log, "Unsatisfiable read"):
00858             response = self.fetch("/", headers={'X-Filler': 'a' * 1000})
00859         self.assertEqual(response.code, 599)
00860 
00861 
00862 @skipOnTravis
00863 class IdleTimeoutTest(AsyncHTTPTestCase):
00864     def get_app(self):
00865         return Application([('/', HelloWorldRequestHandler)])
00866 
00867     def get_httpserver_options(self):
00868         return dict(idle_connection_timeout=0.1)
00869 
00870     def setUp(self):
00871         super(IdleTimeoutTest, self).setUp()
00872         self.streams = []
00873 
00874     def tearDown(self):
00875         super(IdleTimeoutTest, self).tearDown()
00876         for stream in self.streams:
00877             stream.close()
00878 
00879     def connect(self):
00880         stream = IOStream(socket.socket())
00881         stream.connect(('localhost', self.get_http_port()), self.stop)
00882         self.wait()
00883         self.streams.append(stream)
00884         return stream
00885 
00886     def test_unused_connection(self):
00887         stream = self.connect()
00888         stream.set_close_callback(self.stop)
00889         self.wait()
00890 
00891     def test_idle_after_use(self):
00892         stream = self.connect()
00893         stream.set_close_callback(lambda: self.stop("closed"))
00894 
00895         # Use the connection twice to make sure keep-alives are working
00896         for i in range(2):
00897             stream.write(b"GET / HTTP/1.1\r\n\r\n")
00898             stream.read_until(b"\r\n\r\n", self.stop)
00899             self.wait()
00900             stream.read_bytes(11, self.stop)
00901             data = self.wait()
00902             self.assertEqual(data, b"Hello world")
00903 
00904         # Now let the timeout trigger and close the connection.
00905         data = self.wait()
00906         self.assertEqual(data, "closed")
00907 
00908 
00909 class BodyLimitsTest(AsyncHTTPTestCase):
00910     def get_app(self):
00911         class BufferedHandler(RequestHandler):
00912             def put(self):
00913                 self.write(str(len(self.request.body)))
00914 
00915         @stream_request_body
00916         class StreamingHandler(RequestHandler):
00917             def initialize(self):
00918                 self.bytes_read = 0
00919 
00920             def prepare(self):
00921                 if 'expected_size' in self.request.arguments:
00922                     self.request.connection.set_max_body_size(
00923                         int(self.get_argument('expected_size')))
00924                 if 'body_timeout' in self.request.arguments:
00925                     self.request.connection.set_body_timeout(
00926                         float(self.get_argument('body_timeout')))
00927 
00928             def data_received(self, data):
00929                 self.bytes_read += len(data)
00930 
00931             def put(self):
00932                 self.write(str(self.bytes_read))
00933 
00934         return Application([('/buffered', BufferedHandler),
00935                             ('/streaming', StreamingHandler)])
00936 
00937     def get_httpserver_options(self):
00938         return dict(body_timeout=3600, max_body_size=4096)
00939 
00940     def get_http_client(self):
00941         # body_producer doesn't work on curl_httpclient, so override the
00942         # configured AsyncHTTPClient implementation.
00943         return SimpleAsyncHTTPClient(io_loop=self.io_loop)
00944 
00945     def test_small_body(self):
00946         response = self.fetch('/buffered', method='PUT', body=b'a' * 4096)
00947         self.assertEqual(response.body, b'4096')
00948         response = self.fetch('/streaming', method='PUT', body=b'a' * 4096)
00949         self.assertEqual(response.body, b'4096')
00950 
00951     def test_large_body_buffered(self):
00952         with ExpectLog(gen_log, '.*Content-Length too long'):
00953             response = self.fetch('/buffered', method='PUT', body=b'a' * 10240)
00954         self.assertEqual(response.code, 599)
00955 
00956     def test_large_body_buffered_chunked(self):
00957         with ExpectLog(gen_log, '.*chunked body too large'):
00958             response = self.fetch('/buffered', method='PUT',
00959                                   body_producer=lambda write: write(b'a' * 10240))
00960         self.assertEqual(response.code, 599)
00961 
00962     def test_large_body_streaming(self):
00963         with ExpectLog(gen_log, '.*Content-Length too long'):
00964             response = self.fetch('/streaming', method='PUT', body=b'a' * 10240)
00965         self.assertEqual(response.code, 599)
00966 
00967     def test_large_body_streaming_chunked(self):
00968         with ExpectLog(gen_log, '.*chunked body too large'):
00969             response = self.fetch('/streaming', method='PUT',
00970                                   body_producer=lambda write: write(b'a' * 10240))
00971         self.assertEqual(response.code, 599)
00972 
00973     def test_large_body_streaming_override(self):
00974         response = self.fetch('/streaming?expected_size=10240', method='PUT',
00975                               body=b'a' * 10240)
00976         self.assertEqual(response.body, b'10240')
00977 
00978     def test_large_body_streaming_chunked_override(self):
00979         response = self.fetch('/streaming?expected_size=10240', method='PUT',
00980                               body_producer=lambda write: write(b'a' * 10240))
00981         self.assertEqual(response.body, b'10240')
00982 
00983     @gen_test
00984     def test_timeout(self):
00985         stream = IOStream(socket.socket())
00986         try:
00987             yield stream.connect(('127.0.0.1', self.get_http_port()))
00988             # Use a raw stream because AsyncHTTPClient won't let us read a
00989             # response without finishing a body.
00990             stream.write(b'PUT /streaming?body_timeout=0.1 HTTP/1.0\r\n'
00991                          b'Content-Length: 42\r\n\r\n')
00992             with ExpectLog(gen_log, 'Timeout reading body'):
00993                 response = yield stream.read_until_close()
00994             self.assertEqual(response, b'')
00995         finally:
00996             stream.close()
00997 
00998     @gen_test
00999     def test_body_size_override_reset(self):
01000         # The max_body_size override is reset between requests.
01001         stream = IOStream(socket.socket())
01002         try:
01003             yield stream.connect(('127.0.0.1', self.get_http_port()))
01004             # Use a raw stream so we can make sure it's all on one connection.
01005             stream.write(b'PUT /streaming?expected_size=10240 HTTP/1.1\r\n'
01006                          b'Content-Length: 10240\r\n\r\n')
01007             stream.write(b'a' * 10240)
01008             headers, response = yield gen.Task(read_stream_body, stream)
01009             self.assertEqual(response, b'10240')
01010             # Without the ?expected_size parameter, we get the old default value
01011             stream.write(b'PUT /streaming HTTP/1.1\r\n'
01012                          b'Content-Length: 10240\r\n\r\n')
01013             with ExpectLog(gen_log, '.*Content-Length too long'):
01014                 data = yield stream.read_until_close()
01015             self.assertEqual(data, b'')
01016         finally:
01017             stream.close()
01018 
01019 
01020 class LegacyInterfaceTest(AsyncHTTPTestCase):
01021     def get_app(self):
01022         # The old request_callback interface does not implement the
01023         # delegate interface, and writes its response via request.write
01024         # instead of request.connection.write_headers.
01025         def handle_request(request):
01026             message = b"Hello world"
01027             request.write(utf8("HTTP/1.1 200 OK\r\n"
01028                                "Content-Length: %d\r\n\r\n" % len(message)))
01029             request.write(message)
01030             request.finish()
01031         return handle_request
01032 
01033     def test_legacy_interface(self):
01034         response = self.fetch('/')
01035         self.assertEqual(response.body, b"Hello world")


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:50