web_test.py
Go to the documentation of this file.
00001 from __future__ import absolute_import, division, print_function, with_statement
00002 from tornado.concurrent import Future
00003 from tornado import gen
00004 from tornado.escape import json_decode, utf8, to_unicode, recursive_unicode, native_str, to_basestring
00005 from tornado.httputil import format_timestamp
00006 from tornado.iostream import IOStream
00007 from tornado.log import app_log, gen_log
00008 from tornado.simple_httpclient import SimpleAsyncHTTPClient
00009 from tornado.template import DictLoader
00010 from tornado.testing import AsyncHTTPTestCase, ExpectLog, gen_test
00011 from tornado.test.util import unittest
00012 from tornado.util import u, bytes_type, ObjectDict, unicode_type
00013 from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature_v1, create_signed_value, decode_signed_value, ErrorHandler, UIModule, MissingArgumentError, stream_request_body, Finish
00014 
00015 import binascii
00016 import contextlib
00017 import datetime
00018 import email.utils
00019 import itertools
00020 import logging
00021 import os
00022 import re
00023 import socket
00024 import sys
00025 
00026 try:
00027     import urllib.parse as urllib_parse  # py3
00028 except ImportError:
00029     import urllib as urllib_parse  # py2
00030 
00031 wsgi_safe_tests = []
00032 
00033 relpath = lambda *a: os.path.join(os.path.dirname(__file__), *a)
00034 
00035 
00036 def wsgi_safe(cls):
00037     wsgi_safe_tests.append(cls)
00038     return cls
00039 
00040 
00041 class WebTestCase(AsyncHTTPTestCase):
00042     """Base class for web tests that also supports WSGI mode.
00043 
00044     Override get_handlers and get_app_kwargs instead of get_app.
00045     Append to wsgi_safe to have it run in wsgi_test as well.
00046     """
00047     def get_app(self):
00048         self.app = Application(self.get_handlers(), **self.get_app_kwargs())
00049         return self.app
00050 
00051     def get_handlers(self):
00052         raise NotImplementedError()
00053 
00054     def get_app_kwargs(self):
00055         return {}
00056 
00057 
00058 class SimpleHandlerTestCase(WebTestCase):
00059     """Simplified base class for tests that work with a single handler class.
00060 
00061     To use, define a nested class named ``Handler``.
00062     """
00063     def get_handlers(self):
00064         return [('/', self.Handler)]
00065 
00066 
00067 class HelloHandler(RequestHandler):
00068     def get(self):
00069         self.write('hello')
00070 
00071 
00072 class CookieTestRequestHandler(RequestHandler):
00073     # stub out enough methods to make the secure_cookie functions work
00074     def __init__(self):
00075         # don't call super.__init__
00076         self._cookies = {}
00077         self.application = ObjectDict(settings=dict(cookie_secret='0123456789'))
00078 
00079     def get_cookie(self, name):
00080         return self._cookies.get(name)
00081 
00082     def set_cookie(self, name, value, expires_days=None):
00083         self._cookies[name] = value
00084 
00085 
00086 # See SignedValueTest below for more.
00087 class SecureCookieV1Test(unittest.TestCase):
00088     def test_round_trip(self):
00089         handler = CookieTestRequestHandler()
00090         handler.set_secure_cookie('foo', b'bar', version=1)
00091         self.assertEqual(handler.get_secure_cookie('foo', min_version=1),
00092                          b'bar')
00093 
00094     def test_cookie_tampering_future_timestamp(self):
00095         handler = CookieTestRequestHandler()
00096         # this string base64-encodes to '12345678'
00097         handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'),
00098                                   version=1)
00099         cookie = handler._cookies['foo']
00100         match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie)
00101         self.assertTrue(match)
00102         timestamp = match.group(1)
00103         sig = match.group(2)
00104         self.assertEqual(
00105             _create_signature_v1(handler.application.settings["cookie_secret"],
00106                                  'foo', '12345678', timestamp),
00107             sig)
00108         # shifting digits from payload to timestamp doesn't alter signature
00109         # (this is not desirable behavior, just confirming that that's how it
00110         # works)
00111         self.assertEqual(
00112             _create_signature_v1(handler.application.settings["cookie_secret"],
00113                                  'foo', '1234', b'5678' + timestamp),
00114             sig)
00115         # tamper with the cookie
00116         handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
00117             to_basestring(timestamp), to_basestring(sig)))
00118         # it gets rejected
00119         with ExpectLog(gen_log, "Cookie timestamp in future"):
00120             self.assertTrue(
00121                 handler.get_secure_cookie('foo', min_version=1) is None)
00122 
00123     def test_arbitrary_bytes(self):
00124         # Secure cookies accept arbitrary data (which is base64 encoded).
00125         # Note that normal cookies accept only a subset of ascii.
00126         handler = CookieTestRequestHandler()
00127         handler.set_secure_cookie('foo', b'\xe9', version=1)
00128         self.assertEqual(handler.get_secure_cookie('foo', min_version=1), b'\xe9')
00129 
00130 
00131 class CookieTest(WebTestCase):
00132     def get_handlers(self):
00133         class SetCookieHandler(RequestHandler):
00134             def get(self):
00135                 # Try setting cookies with different argument types
00136                 # to ensure that everything gets encoded correctly
00137                 self.set_cookie("str", "asdf")
00138                 self.set_cookie("unicode", u("qwer"))
00139                 self.set_cookie("bytes", b"zxcv")
00140 
00141         class GetCookieHandler(RequestHandler):
00142             def get(self):
00143                 self.write(self.get_cookie("foo", "default"))
00144 
00145         class SetCookieDomainHandler(RequestHandler):
00146             def get(self):
00147                 # unicode domain and path arguments shouldn't break things
00148                 # either (see bug #285)
00149                 self.set_cookie("unicode_args", "blah", domain=u("foo.com"),
00150                                 path=u("/foo"))
00151 
00152         class SetCookieSpecialCharHandler(RequestHandler):
00153             def get(self):
00154                 self.set_cookie("equals", "a=b")
00155                 self.set_cookie("semicolon", "a;b")
00156                 self.set_cookie("quote", 'a"b')
00157 
00158         class SetCookieOverwriteHandler(RequestHandler):
00159             def get(self):
00160                 self.set_cookie("a", "b", domain="example.com")
00161                 self.set_cookie("c", "d", domain="example.com")
00162                 # A second call with the same name clobbers the first.
00163                 # Attributes from the first call are not carried over.
00164                 self.set_cookie("a", "e")
00165 
00166         return [("/set", SetCookieHandler),
00167                 ("/get", GetCookieHandler),
00168                 ("/set_domain", SetCookieDomainHandler),
00169                 ("/special_char", SetCookieSpecialCharHandler),
00170                 ("/set_overwrite", SetCookieOverwriteHandler),
00171                 ]
00172 
00173     def test_set_cookie(self):
00174         response = self.fetch("/set")
00175         self.assertEqual(sorted(response.headers.get_list("Set-Cookie")),
00176                          ["bytes=zxcv; Path=/",
00177                           "str=asdf; Path=/",
00178                           "unicode=qwer; Path=/",
00179                           ])
00180 
00181     def test_get_cookie(self):
00182         response = self.fetch("/get", headers={"Cookie": "foo=bar"})
00183         self.assertEqual(response.body, b"bar")
00184 
00185         response = self.fetch("/get", headers={"Cookie": 'foo="bar"'})
00186         self.assertEqual(response.body, b"bar")
00187 
00188         response = self.fetch("/get", headers={"Cookie": "/=exception;"})
00189         self.assertEqual(response.body, b"default")
00190 
00191     def test_set_cookie_domain(self):
00192         response = self.fetch("/set_domain")
00193         self.assertEqual(response.headers.get_list("Set-Cookie"),
00194                          ["unicode_args=blah; Domain=foo.com; Path=/foo"])
00195 
00196     def test_cookie_special_char(self):
00197         response = self.fetch("/special_char")
00198         headers = sorted(response.headers.get_list("Set-Cookie"))
00199         self.assertEqual(len(headers), 3)
00200         self.assertEqual(headers[0], 'equals="a=b"; Path=/')
00201         self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
00202         # python 2.7 octal-escapes the semicolon; older versions leave it alone
00203         self.assertTrue(headers[2] in ('semicolon="a;b"; Path=/',
00204                                        'semicolon="a\\073b"; Path=/'),
00205                         headers[2])
00206 
00207         data = [('foo=a=b', 'a=b'),
00208                 ('foo="a=b"', 'a=b'),
00209                 ('foo="a;b"', 'a;b'),
00210                 # ('foo=a\\073b', 'a;b'),  # even encoded, ";" is a delimiter
00211                 ('foo="a\\073b"', 'a;b'),
00212                 ('foo="a\\"b"', 'a"b'),
00213                 ]
00214         for header, expected in data:
00215             logging.debug("trying %r", header)
00216             response = self.fetch("/get", headers={"Cookie": header})
00217             self.assertEqual(response.body, utf8(expected))
00218 
00219     def test_set_cookie_overwrite(self):
00220         response = self.fetch("/set_overwrite")
00221         headers = response.headers.get_list("Set-Cookie")
00222         self.assertEqual(sorted(headers),
00223                          ["a=e; Path=/", "c=d; Domain=example.com; Path=/"])
00224 
00225 
00226 class AuthRedirectRequestHandler(RequestHandler):
00227     def initialize(self, login_url):
00228         self.login_url = login_url
00229 
00230     def get_login_url(self):
00231         return self.login_url
00232 
00233     @authenticated
00234     def get(self):
00235         # we'll never actually get here because the test doesn't follow redirects
00236         self.send_error(500)
00237 
00238 
00239 class AuthRedirectTest(WebTestCase):
00240     def get_handlers(self):
00241         return [('/relative', AuthRedirectRequestHandler,
00242                  dict(login_url='/login')),
00243                 ('/absolute', AuthRedirectRequestHandler,
00244                  dict(login_url='http://example.com/login'))]
00245 
00246     def test_relative_auth_redirect(self):
00247         self.http_client.fetch(self.get_url('/relative'), self.stop,
00248                                follow_redirects=False)
00249         response = self.wait()
00250         self.assertEqual(response.code, 302)
00251         self.assertEqual(response.headers['Location'], '/login?next=%2Frelative')
00252 
00253     def test_absolute_auth_redirect(self):
00254         self.http_client.fetch(self.get_url('/absolute'), self.stop,
00255                                follow_redirects=False)
00256         response = self.wait()
00257         self.assertEqual(response.code, 302)
00258         self.assertTrue(re.match(
00259             'http://example.com/login\?next=http%3A%2F%2Flocalhost%3A[0-9]+%2Fabsolute',
00260             response.headers['Location']), response.headers['Location'])
00261 
00262 
00263 class ConnectionCloseHandler(RequestHandler):
00264     def initialize(self, test):
00265         self.test = test
00266 
00267     @asynchronous
00268     def get(self):
00269         self.test.on_handler_waiting()
00270 
00271     def on_connection_close(self):
00272         self.test.on_connection_close()
00273 
00274 
00275 class ConnectionCloseTest(WebTestCase):
00276     def get_handlers(self):
00277         return [('/', ConnectionCloseHandler, dict(test=self))]
00278 
00279     def test_connection_close(self):
00280         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00281         s.connect(("localhost", self.get_http_port()))
00282         self.stream = IOStream(s, io_loop=self.io_loop)
00283         self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
00284         self.wait()
00285 
00286     def on_handler_waiting(self):
00287         logging.debug('handler waiting')
00288         self.stream.close()
00289 
00290     def on_connection_close(self):
00291         logging.debug('connection closed')
00292         self.stop()
00293 
00294 
00295 class EchoHandler(RequestHandler):
00296     def get(self, *path_args):
00297         # Type checks: web.py interfaces convert argument values to
00298         # unicode strings (by default, but see also decode_argument).
00299         # In httpserver.py (i.e. self.request.arguments), they're left
00300         # as bytes.  Keys are always native strings.
00301         for key in self.request.arguments:
00302             if type(key) != str:
00303                 raise Exception("incorrect type for key: %r" % type(key))
00304             for value in self.request.arguments[key]:
00305                 if type(value) != bytes_type:
00306                     raise Exception("incorrect type for value: %r" %
00307                                     type(value))
00308             for value in self.get_arguments(key):
00309                 if type(value) != unicode_type:
00310                     raise Exception("incorrect type for value: %r" %
00311                                     type(value))
00312         for arg in path_args:
00313             if type(arg) != unicode_type:
00314                 raise Exception("incorrect type for path arg: %r" % type(arg))
00315         self.write(dict(path=self.request.path,
00316                         path_args=path_args,
00317                         args=recursive_unicode(self.request.arguments)))
00318 
00319 
00320 class RequestEncodingTest(WebTestCase):
00321     def get_handlers(self):
00322         return [("/group/(.*)", EchoHandler),
00323                 ("/slashes/([^/]*)/([^/]*)", EchoHandler),
00324                 ]
00325 
00326     def fetch_json(self, path):
00327         return json_decode(self.fetch(path).body)
00328 
00329     def test_group_question_mark(self):
00330         # Ensure that url-encoded question marks are handled properly
00331         self.assertEqual(self.fetch_json('/group/%3F'),
00332                          dict(path='/group/%3F', path_args=['?'], args={}))
00333         self.assertEqual(self.fetch_json('/group/%3F?%3F=%3F'),
00334                          dict(path='/group/%3F', path_args=['?'], args={'?': ['?']}))
00335 
00336     def test_group_encoding(self):
00337         # Path components and query arguments should be decoded the same way
00338         self.assertEqual(self.fetch_json('/group/%C3%A9?arg=%C3%A9'),
00339                          {u("path"): u("/group/%C3%A9"),
00340                           u("path_args"): [u("\u00e9")],
00341                           u("args"): {u("arg"): [u("\u00e9")]}})
00342 
00343     def test_slashes(self):
00344         # Slashes may be escaped to appear as a single "directory" in the path,
00345         # but they are then unescaped when passed to the get() method.
00346         self.assertEqual(self.fetch_json('/slashes/foo/bar'),
00347                          dict(path="/slashes/foo/bar",
00348                               path_args=["foo", "bar"],
00349                               args={}))
00350         self.assertEqual(self.fetch_json('/slashes/a%2Fb/c%2Fd'),
00351                          dict(path="/slashes/a%2Fb/c%2Fd",
00352                               path_args=["a/b", "c/d"],
00353                               args={}))
00354 
00355 
00356 class TypeCheckHandler(RequestHandler):
00357     def prepare(self):
00358         self.errors = {}
00359 
00360         self.check_type('status', self.get_status(), int)
00361 
00362         # get_argument is an exception from the general rule of using
00363         # type str for non-body data mainly for historical reasons.
00364         self.check_type('argument', self.get_argument('foo'), unicode_type)
00365         self.check_type('cookie_key', list(self.cookies.keys())[0], str)
00366         self.check_type('cookie_value', list(self.cookies.values())[0].value, str)
00367 
00368         # Secure cookies return bytes because they can contain arbitrary
00369         # data, but regular cookies are native strings.
00370         if list(self.cookies.keys()) != ['asdf']:
00371             raise Exception("unexpected values for cookie keys: %r" %
00372                             self.cookies.keys())
00373         self.check_type('get_secure_cookie', self.get_secure_cookie('asdf'), bytes_type)
00374         self.check_type('get_cookie', self.get_cookie('asdf'), str)
00375 
00376         self.check_type('xsrf_token', self.xsrf_token, bytes_type)
00377         self.check_type('xsrf_form_html', self.xsrf_form_html(), str)
00378 
00379         self.check_type('reverse_url', self.reverse_url('typecheck', 'foo'), str)
00380 
00381         self.check_type('request_summary', self._request_summary(), str)
00382 
00383     def get(self, path_component):
00384         # path_component uses type unicode instead of str for consistency
00385         # with get_argument()
00386         self.check_type('path_component', path_component, unicode_type)
00387         self.write(self.errors)
00388 
00389     def post(self, path_component):
00390         self.check_type('path_component', path_component, unicode_type)
00391         self.write(self.errors)
00392 
00393     def check_type(self, name, obj, expected_type):
00394         actual_type = type(obj)
00395         if expected_type != actual_type:
00396             self.errors[name] = "expected %s, got %s" % (expected_type,
00397                                                          actual_type)
00398 
00399 
00400 class DecodeArgHandler(RequestHandler):
00401     def decode_argument(self, value, name=None):
00402         if type(value) != bytes_type:
00403             raise Exception("unexpected type for value: %r" % type(value))
00404         # use self.request.arguments directly to avoid recursion
00405         if 'encoding' in self.request.arguments:
00406             return value.decode(to_unicode(self.request.arguments['encoding'][0]))
00407         else:
00408             return value
00409 
00410     def get(self, arg):
00411         def describe(s):
00412             if type(s) == bytes_type:
00413                 return ["bytes", native_str(binascii.b2a_hex(s))]
00414             elif type(s) == unicode_type:
00415                 return ["unicode", s]
00416             raise Exception("unknown type")
00417         self.write({'path': describe(arg),
00418                     'query': describe(self.get_argument("foo")),
00419                     })
00420 
00421 
00422 class LinkifyHandler(RequestHandler):
00423     def get(self):
00424         self.render("linkify.html", message="http://example.com")
00425 
00426 
00427 class UIModuleResourceHandler(RequestHandler):
00428     def get(self):
00429         self.render("page.html", entries=[1, 2])
00430 
00431 
00432 class OptionalPathHandler(RequestHandler):
00433     def get(self, path):
00434         self.write({"path": path})
00435 
00436 
00437 class FlowControlHandler(RequestHandler):
00438     # These writes are too small to demonstrate real flow control,
00439     # but at least it shows that the callbacks get run.
00440     @asynchronous
00441     def get(self):
00442         self.write("1")
00443         self.flush(callback=self.step2)
00444 
00445     def step2(self):
00446         self.write("2")
00447         self.flush(callback=self.step3)
00448 
00449     def step3(self):
00450         self.write("3")
00451         self.finish()
00452 
00453 
00454 class MultiHeaderHandler(RequestHandler):
00455     def get(self):
00456         self.set_header("x-overwrite", "1")
00457         self.set_header("X-Overwrite", 2)
00458         self.add_header("x-multi", 3)
00459         self.add_header("X-Multi", "4")
00460 
00461 
00462 class RedirectHandler(RequestHandler):
00463     def get(self):
00464         if self.get_argument('permanent', None) is not None:
00465             self.redirect('/', permanent=int(self.get_argument('permanent')))
00466         elif self.get_argument('status', None) is not None:
00467             self.redirect('/', status=int(self.get_argument('status')))
00468         else:
00469             raise Exception("didn't get permanent or status arguments")
00470 
00471 
00472 class EmptyFlushCallbackHandler(RequestHandler):
00473     @gen.engine
00474     @asynchronous
00475     def get(self):
00476         # Ensure that the flush callback is run whether or not there
00477         # was any output.  The gen.Task and direct yield forms are
00478         # equivalent.
00479         yield gen.Task(self.flush)  # "empty" flush, but writes headers
00480         yield gen.Task(self.flush)  # empty flush
00481         self.write("o")
00482         yield self.flush()  # flushes the "o"
00483         yield self.flush()  # empty flush
00484         self.finish("k")
00485 
00486 
00487 class HeaderInjectionHandler(RequestHandler):
00488     def get(self):
00489         try:
00490             self.set_header("X-Foo", "foo\r\nX-Bar: baz")
00491             raise Exception("Didn't get expected exception")
00492         except ValueError as e:
00493             if "Unsafe header value" in str(e):
00494                 self.finish(b"ok")
00495             else:
00496                 raise
00497 
00498 
00499 class GetArgumentHandler(RequestHandler):
00500     def prepare(self):
00501         if self.get_argument('source', None) == 'query':
00502             method = self.get_query_argument
00503         elif self.get_argument('source', None) == 'body':
00504             method = self.get_body_argument
00505         else:
00506             method = self.get_argument
00507         self.finish(method("foo", "default"))
00508 
00509 
00510 class GetArgumentsHandler(RequestHandler):
00511     def prepare(self):
00512         self.finish(dict(default=self.get_arguments("foo"),
00513                          query=self.get_query_arguments("foo"),
00514                          body=self.get_body_arguments("foo")))
00515 
00516 
00517 # This test is shared with wsgi_test.py
00518 @wsgi_safe
00519 class WSGISafeWebTest(WebTestCase):
00520     COOKIE_SECRET = "WebTest.COOKIE_SECRET"
00521 
00522     def get_app_kwargs(self):
00523         loader = DictLoader({
00524             "linkify.html": "{% module linkify(message) %}",
00525             "page.html": """\
00526 <html><head></head><body>
00527 {% for e in entries %}
00528 {% module Template("entry.html", entry=e) %}
00529 {% end %}
00530 </body></html>""",
00531             "entry.html": """\
00532 {{ set_resources(embedded_css=".entry { margin-bottom: 1em; }", embedded_javascript="js_embed()", css_files=["/base.css", "/foo.css"], javascript_files="/common.js", html_head="<meta>", html_body='<script src="/analytics.js"/>') }}
00533 <div class="entry">...</div>""",
00534         })
00535         return dict(template_loader=loader,
00536                     autoescape="xhtml_escape",
00537                     cookie_secret=self.COOKIE_SECRET)
00538 
00539     def tearDown(self):
00540         super(WSGISafeWebTest, self).tearDown()
00541         RequestHandler._template_loaders.clear()
00542 
00543     def get_handlers(self):
00544         urls = [
00545             url("/typecheck/(.*)", TypeCheckHandler, name='typecheck'),
00546             url("/decode_arg/(.*)", DecodeArgHandler, name='decode_arg'),
00547             url("/decode_arg_kw/(?P<arg>.*)", DecodeArgHandler),
00548             url("/linkify", LinkifyHandler),
00549             url("/uimodule_resources", UIModuleResourceHandler),
00550             url("/optional_path/(.+)?", OptionalPathHandler),
00551             url("/multi_header", MultiHeaderHandler),
00552             url("/redirect", RedirectHandler),
00553             url("/header_injection", HeaderInjectionHandler),
00554             url("/get_argument", GetArgumentHandler),
00555             url("/get_arguments", GetArgumentsHandler),
00556         ]
00557         return urls
00558 
00559     def fetch_json(self, *args, **kwargs):
00560         response = self.fetch(*args, **kwargs)
00561         response.rethrow()
00562         return json_decode(response.body)
00563 
00564     def test_types(self):
00565         cookie_value = to_unicode(create_signed_value(self.COOKIE_SECRET,
00566                                                       "asdf", "qwer"))
00567         response = self.fetch("/typecheck/asdf?foo=bar",
00568                               headers={"Cookie": "asdf=" + cookie_value})
00569         data = json_decode(response.body)
00570         self.assertEqual(data, {})
00571 
00572         response = self.fetch("/typecheck/asdf?foo=bar", method="POST",
00573                               headers={"Cookie": "asdf=" + cookie_value},
00574                               body="foo=bar")
00575 
00576     def test_decode_argument(self):
00577         # These urls all decode to the same thing
00578         urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
00579                 "/decode_arg/%E9?foo=%E9&encoding=latin1",
00580                 "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
00581                 ]
00582         for req_url in urls:
00583             response = self.fetch(req_url)
00584             response.rethrow()
00585             data = json_decode(response.body)
00586             self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
00587                                     u('query'): [u('unicode'), u('\u00e9')],
00588                                     })
00589 
00590         response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
00591         response.rethrow()
00592         data = json_decode(response.body)
00593         self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
00594                                 u('query'): [u('bytes'), u('c3a9')],
00595                                 })
00596 
00597     def test_decode_argument_invalid_unicode(self):
00598         # test that invalid unicode in URLs causes 400, not 500
00599         with ExpectLog(gen_log, ".*Invalid unicode.*"):
00600             response = self.fetch("/typecheck/invalid%FF")
00601             self.assertEqual(response.code, 400)
00602             response = self.fetch("/typecheck/invalid?foo=%FF")
00603             self.assertEqual(response.code, 400)
00604 
00605     def test_decode_argument_plus(self):
00606         # These urls are all equivalent.
00607         urls = ["/decode_arg/1%20%2B%201?foo=1%20%2B%201&encoding=utf-8",
00608                 "/decode_arg/1%20+%201?foo=1+%2B+1&encoding=utf-8"]
00609         for req_url in urls:
00610             response = self.fetch(req_url)
00611             response.rethrow()
00612             data = json_decode(response.body)
00613             self.assertEqual(data, {u('path'): [u('unicode'), u('1 + 1')],
00614                                     u('query'): [u('unicode'), u('1 + 1')],
00615                                     })
00616 
00617     def test_reverse_url(self):
00618         self.assertEqual(self.app.reverse_url('decode_arg', 'foo'),
00619                          '/decode_arg/foo')
00620         self.assertEqual(self.app.reverse_url('decode_arg', 42),
00621                          '/decode_arg/42')
00622         self.assertEqual(self.app.reverse_url('decode_arg', b'\xe9'),
00623                          '/decode_arg/%E9')
00624         self.assertEqual(self.app.reverse_url('decode_arg', u('\u00e9')),
00625                          '/decode_arg/%C3%A9')
00626         self.assertEqual(self.app.reverse_url('decode_arg', '1 + 1'),
00627                          '/decode_arg/1%20%2B%201')
00628 
00629     def test_uimodule_unescaped(self):
00630         response = self.fetch("/linkify")
00631         self.assertEqual(response.body,
00632                          b"<a href=\"http://example.com\">http://example.com</a>")
00633 
00634     def test_uimodule_resources(self):
00635         response = self.fetch("/uimodule_resources")
00636         self.assertEqual(response.body, b"""\
00637 <html><head><link href="/base.css" type="text/css" rel="stylesheet"/><link href="/foo.css" type="text/css" rel="stylesheet"/>
00638 <style type="text/css">
00639 .entry { margin-bottom: 1em; }
00640 </style>
00641 <meta>
00642 </head><body>
00643 
00644 
00645 <div class="entry">...</div>
00646 
00647 
00648 <div class="entry">...</div>
00649 
00650 <script src="/common.js" type="text/javascript"></script>
00651 <script type="text/javascript">
00652 //<![CDATA[
00653 js_embed()
00654 //]]>
00655 </script>
00656 <script src="/analytics.js"/>
00657 </body></html>""")
00658 
00659     def test_optional_path(self):
00660         self.assertEqual(self.fetch_json("/optional_path/foo"),
00661                          {u("path"): u("foo")})
00662         self.assertEqual(self.fetch_json("/optional_path/"),
00663                          {u("path"): None})
00664 
00665     def test_multi_header(self):
00666         response = self.fetch("/multi_header")
00667         self.assertEqual(response.headers["x-overwrite"], "2")
00668         self.assertEqual(response.headers.get_list("x-multi"), ["3", "4"])
00669 
00670     def test_redirect(self):
00671         response = self.fetch("/redirect?permanent=1", follow_redirects=False)
00672         self.assertEqual(response.code, 301)
00673         response = self.fetch("/redirect?permanent=0", follow_redirects=False)
00674         self.assertEqual(response.code, 302)
00675         response = self.fetch("/redirect?status=307", follow_redirects=False)
00676         self.assertEqual(response.code, 307)
00677 
00678     def test_header_injection(self):
00679         response = self.fetch("/header_injection")
00680         self.assertEqual(response.body, b"ok")
00681 
00682     def test_get_argument(self):
00683         response = self.fetch("/get_argument?foo=bar")
00684         self.assertEqual(response.body, b"bar")
00685         response = self.fetch("/get_argument?foo=")
00686         self.assertEqual(response.body, b"")
00687         response = self.fetch("/get_argument")
00688         self.assertEqual(response.body, b"default")
00689 
00690         # Test merging of query and body arguments.
00691         # In singular form, body arguments take precedence over query arguments.
00692         body = urllib_parse.urlencode(dict(foo="hello"))
00693         response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
00694         self.assertEqual(response.body, b"hello")
00695         # In plural methods they are merged.
00696         response = self.fetch("/get_arguments?foo=bar",
00697                               method="POST", body=body)
00698         self.assertEqual(json_decode(response.body),
00699                          dict(default=['bar', 'hello'],
00700                               query=['bar'],
00701                               body=['hello']))
00702 
00703     def test_get_query_arguments(self):
00704         # send as a post so we can ensure the separation between query
00705         # string and body arguments.
00706         body = urllib_parse.urlencode(dict(foo="hello"))
00707         response = self.fetch("/get_argument?source=query&foo=bar",
00708                               method="POST", body=body)
00709         self.assertEqual(response.body, b"bar")
00710         response = self.fetch("/get_argument?source=query&foo=",
00711                               method="POST", body=body)
00712         self.assertEqual(response.body, b"")
00713         response = self.fetch("/get_argument?source=query",
00714                               method="POST", body=body)
00715         self.assertEqual(response.body, b"default")
00716 
00717     def test_get_body_arguments(self):
00718         body = urllib_parse.urlencode(dict(foo="bar"))
00719         response = self.fetch("/get_argument?source=body&foo=hello",
00720                               method="POST", body=body)
00721         self.assertEqual(response.body, b"bar")
00722 
00723         body = urllib_parse.urlencode(dict(foo=""))
00724         response = self.fetch("/get_argument?source=body&foo=hello",
00725                               method="POST", body=body)
00726         self.assertEqual(response.body, b"")
00727 
00728         body = urllib_parse.urlencode(dict())
00729         response = self.fetch("/get_argument?source=body&foo=hello",
00730                               method="POST", body=body)
00731         self.assertEqual(response.body, b"default")
00732 
00733     def test_no_gzip(self):
00734         response = self.fetch('/get_argument')
00735         self.assertNotIn('Accept-Encoding', response.headers.get('Vary', ''))
00736         self.assertNotIn('gzip', response.headers.get('Content-Encoding', ''))
00737 
00738 
00739 class NonWSGIWebTests(WebTestCase):
00740     def get_handlers(self):
00741         return [("/flow_control", FlowControlHandler),
00742                 ("/empty_flush", EmptyFlushCallbackHandler),
00743                 ]
00744 
00745     def test_flow_control(self):
00746         self.assertEqual(self.fetch("/flow_control").body, b"123")
00747 
00748     def test_empty_flush(self):
00749         response = self.fetch("/empty_flush")
00750         self.assertEqual(response.body, b"ok")
00751 
00752 
00753 @wsgi_safe
00754 class ErrorResponseTest(WebTestCase):
00755     def get_handlers(self):
00756         class DefaultHandler(RequestHandler):
00757             def get(self):
00758                 if self.get_argument("status", None):
00759                     raise HTTPError(int(self.get_argument("status")))
00760                 1 / 0
00761 
00762         class WriteErrorHandler(RequestHandler):
00763             def get(self):
00764                 if self.get_argument("status", None):
00765                     self.send_error(int(self.get_argument("status")))
00766                 else:
00767                     1 / 0
00768 
00769             def write_error(self, status_code, **kwargs):
00770                 self.set_header("Content-Type", "text/plain")
00771                 if "exc_info" in kwargs:
00772                     self.write("Exception: %s" % kwargs["exc_info"][0].__name__)
00773                 else:
00774                     self.write("Status: %d" % status_code)
00775 
00776         class FailedWriteErrorHandler(RequestHandler):
00777             def get(self):
00778                 1 / 0
00779 
00780             def write_error(self, status_code, **kwargs):
00781                 raise Exception("exception in write_error")
00782 
00783         return [url("/default", DefaultHandler),
00784                 url("/write_error", WriteErrorHandler),
00785                 url("/failed_write_error", FailedWriteErrorHandler),
00786                 ]
00787 
00788     def test_default(self):
00789         with ExpectLog(app_log, "Uncaught exception"):
00790             response = self.fetch("/default")
00791             self.assertEqual(response.code, 500)
00792             self.assertTrue(b"500: Internal Server Error" in response.body)
00793 
00794             response = self.fetch("/default?status=503")
00795             self.assertEqual(response.code, 503)
00796             self.assertTrue(b"503: Service Unavailable" in response.body)
00797 
00798     def test_write_error(self):
00799         with ExpectLog(app_log, "Uncaught exception"):
00800             response = self.fetch("/write_error")
00801             self.assertEqual(response.code, 500)
00802             self.assertEqual(b"Exception: ZeroDivisionError", response.body)
00803 
00804             response = self.fetch("/write_error?status=503")
00805             self.assertEqual(response.code, 503)
00806             self.assertEqual(b"Status: 503", response.body)
00807 
00808     def test_failed_write_error(self):
00809         with ExpectLog(app_log, "Uncaught exception"):
00810             response = self.fetch("/failed_write_error")
00811             self.assertEqual(response.code, 500)
00812             self.assertEqual(b"", response.body)
00813 
00814 
00815 @wsgi_safe
00816 class StaticFileTest(WebTestCase):
00817     # The expected MD5 hash of robots.txt, used in tests that call
00818     # StaticFileHandler.get_version
00819     robots_txt_hash = b"f71d20196d4caf35b6a670db8c70b03d"
00820     static_dir = os.path.join(os.path.dirname(__file__), 'static')
00821 
00822     def get_handlers(self):
00823         class StaticUrlHandler(RequestHandler):
00824             def get(self, path):
00825                 with_v = int(self.get_argument('include_version', 1))
00826                 self.write(self.static_url(path, include_version=with_v))
00827 
00828         class AbsoluteStaticUrlHandler(StaticUrlHandler):
00829             include_host = True
00830 
00831         class OverrideStaticUrlHandler(RequestHandler):
00832             def get(self, path):
00833                 do_include = bool(self.get_argument("include_host"))
00834                 self.include_host = not do_include
00835 
00836                 regular_url = self.static_url(path)
00837                 override_url = self.static_url(path, include_host=do_include)
00838                 if override_url == regular_url:
00839                     return self.write(str(False))
00840 
00841                 protocol = self.request.protocol + "://"
00842                 protocol_length = len(protocol)
00843                 check_regular = regular_url.find(protocol, 0, protocol_length)
00844                 check_override = override_url.find(protocol, 0, protocol_length)
00845 
00846                 if do_include:
00847                     result = (check_override == 0 and check_regular == -1)
00848                 else:
00849                     result = (check_override == -1 and check_regular == 0)
00850                 self.write(str(result))
00851 
00852         return [('/static_url/(.*)', StaticUrlHandler),
00853                 ('/abs_static_url/(.*)', AbsoluteStaticUrlHandler),
00854                 ('/override_static_url/(.*)', OverrideStaticUrlHandler)]
00855 
00856     def get_app_kwargs(self):
00857         return dict(static_path=relpath('static'))
00858 
00859     def test_static_files(self):
00860         response = self.fetch('/robots.txt')
00861         self.assertTrue(b"Disallow: /" in response.body)
00862 
00863         response = self.fetch('/static/robots.txt')
00864         self.assertTrue(b"Disallow: /" in response.body)
00865 
00866     def test_static_url(self):
00867         response = self.fetch("/static_url/robots.txt")
00868         self.assertEqual(response.body,
00869                          b"/static/robots.txt?v=" + self.robots_txt_hash)
00870 
00871     def test_absolute_static_url(self):
00872         response = self.fetch("/abs_static_url/robots.txt")
00873         self.assertEqual(response.body, (
00874             utf8(self.get_url("/")) +
00875             b"static/robots.txt?v=" +
00876             self.robots_txt_hash
00877         ))
00878 
00879     def test_relative_version_exclusion(self):
00880         response = self.fetch("/static_url/robots.txt?include_version=0")
00881         self.assertEqual(response.body, b"/static/robots.txt")
00882 
00883     def test_absolute_version_exclusion(self):
00884         response = self.fetch("/abs_static_url/robots.txt?include_version=0")
00885         self.assertEqual(response.body,
00886                          utf8(self.get_url("/") + "static/robots.txt"))
00887 
00888     def test_include_host_override(self):
00889         self._trigger_include_host_check(False)
00890         self._trigger_include_host_check(True)
00891 
00892     def _trigger_include_host_check(self, include_host):
00893         path = "/override_static_url/robots.txt?include_host=%s"
00894         response = self.fetch(path % int(include_host))
00895         self.assertEqual(response.body, utf8(str(True)))
00896 
00897     def get_and_head(self, *args, **kwargs):
00898         """Performs a GET and HEAD request and returns the GET response.
00899 
00900         Fails if any ``Content-*`` headers returned by the two requests
00901         differ.
00902         """
00903         head_response = self.fetch(*args, method="HEAD", **kwargs)
00904         get_response = self.fetch(*args, method="GET", **kwargs)
00905         content_headers = set()
00906         for h in itertools.chain(head_response.headers, get_response.headers):
00907             if h.startswith('Content-'):
00908                 content_headers.add(h)
00909         for h in content_headers:
00910             self.assertEqual(head_response.headers.get(h),
00911                              get_response.headers.get(h),
00912                              "%s differs between GET (%s) and HEAD (%s)" %
00913                              (h, head_response.headers.get(h),
00914                               get_response.headers.get(h)))
00915         return get_response
00916 
00917     def test_static_304_if_modified_since(self):
00918         response1 = self.get_and_head("/static/robots.txt")
00919         response2 = self.get_and_head("/static/robots.txt", headers={
00920             'If-Modified-Since': response1.headers['Last-Modified']})
00921         self.assertEqual(response2.code, 304)
00922         self.assertTrue('Content-Length' not in response2.headers)
00923         self.assertTrue('Last-Modified' not in response2.headers)
00924 
00925     def test_static_304_if_none_match(self):
00926         response1 = self.get_and_head("/static/robots.txt")
00927         response2 = self.get_and_head("/static/robots.txt", headers={
00928             'If-None-Match': response1.headers['Etag']})
00929         self.assertEqual(response2.code, 304)
00930 
00931     def test_static_if_modified_since_pre_epoch(self):
00932         # On windows, the functions that work with time_t do not accept
00933         # negative values, and at least one client (processing.js) seems
00934         # to use if-modified-since 1/1/1960 as a cache-busting technique.
00935         response = self.get_and_head("/static/robots.txt", headers={
00936             'If-Modified-Since': 'Fri, 01 Jan 1960 00:00:00 GMT'})
00937         self.assertEqual(response.code, 200)
00938 
00939     def test_static_if_modified_since_time_zone(self):
00940         # Instead of the value from Last-Modified, make requests with times
00941         # chosen just before and after the known modification time
00942         # of the file to ensure that the right time zone is being used
00943         # when parsing If-Modified-Since.
00944         stat = os.stat(relpath('static/robots.txt'))
00945 
00946         response = self.get_and_head('/static/robots.txt', headers={
00947             'If-Modified-Since': format_timestamp(stat.st_mtime - 1)})
00948         self.assertEqual(response.code, 200)
00949         response = self.get_and_head('/static/robots.txt', headers={
00950             'If-Modified-Since': format_timestamp(stat.st_mtime + 1)})
00951         self.assertEqual(response.code, 304)
00952 
00953     def test_static_etag(self):
00954         response = self.get_and_head('/static/robots.txt')
00955         self.assertEqual(utf8(response.headers.get("Etag")),
00956                          b'"' + self.robots_txt_hash + b'"')
00957 
00958     def test_static_with_range(self):
00959         response = self.get_and_head('/static/robots.txt', headers={
00960             'Range': 'bytes=0-9'})
00961         self.assertEqual(response.code, 206)
00962         self.assertEqual(response.body, b"User-agent")
00963         self.assertEqual(utf8(response.headers.get("Etag")),
00964                          b'"' + self.robots_txt_hash + b'"')
00965         self.assertEqual(response.headers.get("Content-Length"), "10")
00966         self.assertEqual(response.headers.get("Content-Range"),
00967                          "bytes 0-9/26")
00968 
00969     def test_static_with_range_full_file(self):
00970         response = self.get_and_head('/static/robots.txt', headers={
00971             'Range': 'bytes=0-'})
00972         # Note: Chrome refuses to play audio if it gets an HTTP 206 in response
00973         # to ``Range: bytes=0-`` :(
00974         self.assertEqual(response.code, 200)
00975         robots_file_path = os.path.join(self.static_dir, "robots.txt")
00976         with open(robots_file_path) as f:
00977             self.assertEqual(response.body, utf8(f.read()))
00978         self.assertEqual(response.headers.get("Content-Length"), "26")
00979         self.assertEqual(response.headers.get("Content-Range"), None)
00980 
00981     def test_static_with_range_full_past_end(self):
00982         response = self.get_and_head('/static/robots.txt', headers={
00983             'Range': 'bytes=0-10000000'})
00984         self.assertEqual(response.code, 200)
00985         robots_file_path = os.path.join(self.static_dir, "robots.txt")
00986         with open(robots_file_path) as f:
00987             self.assertEqual(response.body, utf8(f.read()))
00988         self.assertEqual(response.headers.get("Content-Length"), "26")
00989         self.assertEqual(response.headers.get("Content-Range"), None)
00990 
00991     def test_static_with_range_partial_past_end(self):
00992         response = self.get_and_head('/static/robots.txt', headers={
00993             'Range': 'bytes=1-10000000'})
00994         self.assertEqual(response.code, 206)
00995         robots_file_path = os.path.join(self.static_dir, "robots.txt")
00996         with open(robots_file_path) as f:
00997             self.assertEqual(response.body, utf8(f.read()[1:]))
00998         self.assertEqual(response.headers.get("Content-Length"), "25")
00999         self.assertEqual(response.headers.get("Content-Range"), "bytes 1-25/26")
01000 
01001     def test_static_with_range_end_edge(self):
01002         response = self.get_and_head('/static/robots.txt', headers={
01003             'Range': 'bytes=22-'})
01004         self.assertEqual(response.body, b": /\n")
01005         self.assertEqual(response.headers.get("Content-Length"), "4")
01006         self.assertEqual(response.headers.get("Content-Range"),
01007                          "bytes 22-25/26")
01008 
01009     def test_static_with_range_neg_end(self):
01010         response = self.get_and_head('/static/robots.txt', headers={
01011             'Range': 'bytes=-4'})
01012         self.assertEqual(response.body, b": /\n")
01013         self.assertEqual(response.headers.get("Content-Length"), "4")
01014         self.assertEqual(response.headers.get("Content-Range"),
01015                          "bytes 22-25/26")
01016 
01017     def test_static_invalid_range(self):
01018         response = self.get_and_head('/static/robots.txt', headers={
01019             'Range': 'asdf'})
01020         self.assertEqual(response.code, 200)
01021 
01022     def test_static_unsatisfiable_range_zero_suffix(self):
01023         response = self.get_and_head('/static/robots.txt', headers={
01024             'Range': 'bytes=-0'})
01025         self.assertEqual(response.headers.get("Content-Range"),
01026                          "bytes */26")
01027         self.assertEqual(response.code, 416)
01028 
01029     def test_static_unsatisfiable_range_invalid_start(self):
01030         response = self.get_and_head('/static/robots.txt', headers={
01031             'Range': 'bytes=26'})
01032         self.assertEqual(response.code, 416)
01033         self.assertEqual(response.headers.get("Content-Range"),
01034                          "bytes */26")
01035 
01036     def test_static_head(self):
01037         response = self.fetch('/static/robots.txt', method='HEAD')
01038         self.assertEqual(response.code, 200)
01039         # No body was returned, but we did get the right content length.
01040         self.assertEqual(response.body, b'')
01041         self.assertEqual(response.headers['Content-Length'], '26')
01042         self.assertEqual(utf8(response.headers['Etag']),
01043                          b'"' + self.robots_txt_hash + b'"')
01044 
01045     def test_static_head_range(self):
01046         response = self.fetch('/static/robots.txt', method='HEAD',
01047                               headers={'Range': 'bytes=1-4'})
01048         self.assertEqual(response.code, 206)
01049         self.assertEqual(response.body, b'')
01050         self.assertEqual(response.headers['Content-Length'], '4')
01051         self.assertEqual(utf8(response.headers['Etag']),
01052                          b'"' + self.robots_txt_hash + b'"')
01053 
01054     def test_static_range_if_none_match(self):
01055         response = self.get_and_head('/static/robots.txt', headers={
01056             'Range': 'bytes=1-4',
01057             'If-None-Match': b'"' + self.robots_txt_hash + b'"'})
01058         self.assertEqual(response.code, 304)
01059         self.assertEqual(response.body, b'')
01060         self.assertTrue('Content-Length' not in response.headers)
01061         self.assertEqual(utf8(response.headers['Etag']),
01062                          b'"' + self.robots_txt_hash + b'"')
01063 
01064     def test_static_404(self):
01065         response = self.get_and_head('/static/blarg')
01066         self.assertEqual(response.code, 404)
01067 
01068 
01069 @wsgi_safe
01070 class StaticDefaultFilenameTest(WebTestCase):
01071     def get_app_kwargs(self):
01072         return dict(static_path=relpath('static'),
01073                     static_handler_args=dict(default_filename='index.html'))
01074 
01075     def get_handlers(self):
01076         return []
01077 
01078     def test_static_default_filename(self):
01079         response = self.fetch('/static/dir/', follow_redirects=False)
01080         self.assertEqual(response.code, 200)
01081         self.assertEqual(b'this is the index\n', response.body)
01082 
01083     def test_static_default_redirect(self):
01084         response = self.fetch('/static/dir', follow_redirects=False)
01085         self.assertEqual(response.code, 301)
01086         self.assertTrue(response.headers['Location'].endswith('/static/dir/'))
01087 
01088 
01089 @wsgi_safe
01090 class StaticFileWithPathTest(WebTestCase):
01091     def get_app_kwargs(self):
01092         return dict(static_path=relpath('static'),
01093                     static_handler_args=dict(default_filename='index.html'))
01094 
01095     def get_handlers(self):
01096         return [("/foo/(.*)", StaticFileHandler, {
01097             "path": relpath("templates/"),
01098         })]
01099 
01100     def test_serve(self):
01101         response = self.fetch("/foo/utf8.html")
01102         self.assertEqual(response.body, b"H\xc3\xa9llo\n")
01103 
01104 
01105 @wsgi_safe
01106 class CustomStaticFileTest(WebTestCase):
01107     def get_handlers(self):
01108         class MyStaticFileHandler(StaticFileHandler):
01109             @classmethod
01110             def make_static_url(cls, settings, path):
01111                 version_hash = cls.get_version(settings, path)
01112                 extension_index = path.rindex('.')
01113                 before_version = path[:extension_index]
01114                 after_version = path[(extension_index + 1):]
01115                 return '/static/%s.%s.%s' % (before_version, version_hash,
01116                                              after_version)
01117 
01118             def parse_url_path(self, url_path):
01119                 extension_index = url_path.rindex('.')
01120                 version_index = url_path.rindex('.', 0, extension_index)
01121                 return '%s%s' % (url_path[:version_index],
01122                                  url_path[extension_index:])
01123 
01124             @classmethod
01125             def get_absolute_path(cls, settings, path):
01126                 return 'CustomStaticFileTest:' + path
01127 
01128             def validate_absolute_path(self, root, absolute_path):
01129                 return absolute_path
01130 
01131             @classmethod
01132             def get_content(self, path, start=None, end=None):
01133                 assert start is None and end is None
01134                 if path == 'CustomStaticFileTest:foo.txt':
01135                     return b'bar'
01136                 raise Exception("unexpected path %r" % path)
01137 
01138             def get_content_size(self):
01139                 if self.absolute_path == 'CustomStaticFileTest:foo.txt':
01140                     return 3
01141                 raise Exception("unexpected path %r" % self.absolute_path)
01142 
01143             def get_modified_time(self):
01144                 return None
01145 
01146             @classmethod
01147             def get_version(cls, settings, path):
01148                 return "42"
01149 
01150         class StaticUrlHandler(RequestHandler):
01151             def get(self, path):
01152                 self.write(self.static_url(path))
01153 
01154         self.static_handler_class = MyStaticFileHandler
01155 
01156         return [("/static_url/(.*)", StaticUrlHandler)]
01157 
01158     def get_app_kwargs(self):
01159         return dict(static_path="dummy",
01160                     static_handler_class=self.static_handler_class)
01161 
01162     def test_serve(self):
01163         response = self.fetch("/static/foo.42.txt")
01164         self.assertEqual(response.body, b"bar")
01165 
01166     def test_static_url(self):
01167         with ExpectLog(gen_log, "Could not open static file", required=False):
01168             response = self.fetch("/static_url/foo.txt")
01169             self.assertEqual(response.body, b"/static/foo.42.txt")
01170 
01171 
01172 @wsgi_safe
01173 class HostMatchingTest(WebTestCase):
01174     class Handler(RequestHandler):
01175         def initialize(self, reply):
01176             self.reply = reply
01177 
01178         def get(self):
01179             self.write(self.reply)
01180 
01181     def get_handlers(self):
01182         return [("/foo", HostMatchingTest.Handler, {"reply": "wildcard"})]
01183 
01184     def test_host_matching(self):
01185         self.app.add_handlers("www.example.com",
01186                               [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})])
01187         self.app.add_handlers(r"www\.example\.com",
01188                               [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})])
01189         self.app.add_handlers("www.example.com",
01190                               [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})])
01191 
01192         response = self.fetch("/foo")
01193         self.assertEqual(response.body, b"wildcard")
01194         response = self.fetch("/bar")
01195         self.assertEqual(response.code, 404)
01196         response = self.fetch("/baz")
01197         self.assertEqual(response.code, 404)
01198 
01199         response = self.fetch("/foo", headers={'Host': 'www.example.com'})
01200         self.assertEqual(response.body, b"[0]")
01201         response = self.fetch("/bar", headers={'Host': 'www.example.com'})
01202         self.assertEqual(response.body, b"[1]")
01203         response = self.fetch("/baz", headers={'Host': 'www.example.com'})
01204         self.assertEqual(response.body, b"[2]")
01205 
01206 
01207 @wsgi_safe
01208 class NamedURLSpecGroupsTest(WebTestCase):
01209     def get_handlers(self):
01210         class EchoHandler(RequestHandler):
01211             def get(self, path):
01212                 self.write(path)
01213 
01214         return [("/str/(?P<path>.*)", EchoHandler),
01215                 (u("/unicode/(?P<path>.*)"), EchoHandler)]
01216 
01217     def test_named_urlspec_groups(self):
01218         response = self.fetch("/str/foo")
01219         self.assertEqual(response.body, b"foo")
01220 
01221         response = self.fetch("/unicode/bar")
01222         self.assertEqual(response.body, b"bar")
01223 
01224 
01225 @wsgi_safe
01226 class ClearHeaderTest(SimpleHandlerTestCase):
01227     class Handler(RequestHandler):
01228         def get(self):
01229             self.set_header("h1", "foo")
01230             self.set_header("h2", "bar")
01231             self.clear_header("h1")
01232             self.clear_header("nonexistent")
01233 
01234     def test_clear_header(self):
01235         response = self.fetch("/")
01236         self.assertTrue("h1" not in response.headers)
01237         self.assertEqual(response.headers["h2"], "bar")
01238 
01239 
01240 @wsgi_safe
01241 class Header304Test(SimpleHandlerTestCase):
01242     class Handler(RequestHandler):
01243         def get(self):
01244             self.set_header("Content-Language", "en_US")
01245             self.write("hello")
01246 
01247     def test_304_headers(self):
01248         response1 = self.fetch('/')
01249         self.assertEqual(response1.headers["Content-Length"], "5")
01250         self.assertEqual(response1.headers["Content-Language"], "en_US")
01251 
01252         response2 = self.fetch('/', headers={
01253             'If-None-Match': response1.headers["Etag"]})
01254         self.assertEqual(response2.code, 304)
01255         self.assertTrue("Content-Length" not in response2.headers)
01256         self.assertTrue("Content-Language" not in response2.headers)
01257         # Not an entity header, but should not be added to 304s by chunking
01258         self.assertTrue("Transfer-Encoding" not in response2.headers)
01259 
01260 
01261 @wsgi_safe
01262 class StatusReasonTest(SimpleHandlerTestCase):
01263     class Handler(RequestHandler):
01264         def get(self):
01265             reason = self.request.arguments.get('reason', [])
01266             self.set_status(int(self.get_argument('code')),
01267                             reason=reason[0] if reason else None)
01268 
01269     def get_http_client(self):
01270         # simple_httpclient only: curl doesn't expose the reason string
01271         return SimpleAsyncHTTPClient(io_loop=self.io_loop)
01272 
01273     def test_status(self):
01274         response = self.fetch("/?code=304")
01275         self.assertEqual(response.code, 304)
01276         self.assertEqual(response.reason, "Not Modified")
01277         response = self.fetch("/?code=304&reason=Foo")
01278         self.assertEqual(response.code, 304)
01279         self.assertEqual(response.reason, "Foo")
01280         response = self.fetch("/?code=682&reason=Bar")
01281         self.assertEqual(response.code, 682)
01282         self.assertEqual(response.reason, "Bar")
01283         with ExpectLog(app_log, 'Uncaught exception'):
01284             response = self.fetch("/?code=682")
01285         self.assertEqual(response.code, 500)
01286 
01287 
01288 @wsgi_safe
01289 class DateHeaderTest(SimpleHandlerTestCase):
01290     class Handler(RequestHandler):
01291         def get(self):
01292             self.write("hello")
01293 
01294     def test_date_header(self):
01295         response = self.fetch('/')
01296         header_date = datetime.datetime(
01297             *email.utils.parsedate(response.headers['Date'])[:6])
01298         self.assertTrue(header_date - datetime.datetime.utcnow() <
01299                         datetime.timedelta(seconds=2))
01300 
01301 
01302 @wsgi_safe
01303 class RaiseWithReasonTest(SimpleHandlerTestCase):
01304     class Handler(RequestHandler):
01305         def get(self):
01306             raise HTTPError(682, reason="Foo")
01307 
01308     def get_http_client(self):
01309         # simple_httpclient only: curl doesn't expose the reason string
01310         return SimpleAsyncHTTPClient(io_loop=self.io_loop)
01311 
01312     def test_raise_with_reason(self):
01313         response = self.fetch("/")
01314         self.assertEqual(response.code, 682)
01315         self.assertEqual(response.reason, "Foo")
01316         self.assertIn(b'682: Foo', response.body)
01317 
01318     def test_httperror_str(self):
01319         self.assertEqual(str(HTTPError(682, reason="Foo")), "HTTP 682: Foo")
01320 
01321 
01322 @wsgi_safe
01323 class ErrorHandlerXSRFTest(WebTestCase):
01324     def get_handlers(self):
01325         # note that if the handlers list is empty we get the default_host
01326         # redirect fallback instead of a 404, so test with both an
01327         # explicitly defined error handler and an implicit 404.
01328         return [('/error', ErrorHandler, dict(status_code=417))]
01329 
01330     def get_app_kwargs(self):
01331         return dict(xsrf_cookies=True)
01332 
01333     def test_error_xsrf(self):
01334         response = self.fetch('/error', method='POST', body='')
01335         self.assertEqual(response.code, 417)
01336 
01337     def test_404_xsrf(self):
01338         response = self.fetch('/404', method='POST', body='')
01339         self.assertEqual(response.code, 404)
01340 
01341 
01342 @wsgi_safe
01343 class GzipTestCase(SimpleHandlerTestCase):
01344     class Handler(RequestHandler):
01345         def get(self):
01346             if self.get_argument('vary', None):
01347                 self.set_header('Vary', self.get_argument('vary'))
01348             self.write('hello world')
01349 
01350     def get_app_kwargs(self):
01351         return dict(
01352             gzip=True,
01353             static_path=os.path.join(os.path.dirname(__file__), 'static'))
01354 
01355     def test_gzip(self):
01356         response = self.fetch('/')
01357         # simple_httpclient renames the content-encoding header;
01358         # curl_httpclient doesn't.
01359         self.assertEqual(
01360             response.headers.get(
01361                 'Content-Encoding',
01362                 response.headers.get('X-Consumed-Content-Encoding')),
01363             'gzip')
01364         self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
01365 
01366     def test_gzip_static(self):
01367         # The streaming responses in StaticFileHandler have subtle
01368         # interactions with the gzip output so test this case separately.
01369         response = self.fetch('/robots.txt')
01370         self.assertEqual(
01371             response.headers.get(
01372                 'Content-Encoding',
01373                 response.headers.get('X-Consumed-Content-Encoding')),
01374             'gzip')
01375         self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
01376 
01377     def test_gzip_not_requested(self):
01378         response = self.fetch('/', use_gzip=False)
01379         self.assertNotIn('Content-Encoding', response.headers)
01380         self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
01381 
01382     def test_vary_already_present(self):
01383         response = self.fetch('/?vary=Accept-Language')
01384         self.assertEqual(response.headers['Vary'],
01385                          'Accept-Language, Accept-Encoding')
01386 
01387 
01388 @wsgi_safe
01389 class PathArgsInPrepareTest(WebTestCase):
01390     class Handler(RequestHandler):
01391         def prepare(self):
01392             self.write(dict(args=self.path_args, kwargs=self.path_kwargs))
01393 
01394         def get(self, path):
01395             assert path == 'foo'
01396             self.finish()
01397 
01398     def get_handlers(self):
01399         return [('/pos/(.*)', self.Handler),
01400                 ('/kw/(?P<path>.*)', self.Handler)]
01401 
01402     def test_pos(self):
01403         response = self.fetch('/pos/foo')
01404         response.rethrow()
01405         data = json_decode(response.body)
01406         self.assertEqual(data, {'args': ['foo'], 'kwargs': {}})
01407 
01408     def test_kw(self):
01409         response = self.fetch('/kw/foo')
01410         response.rethrow()
01411         data = json_decode(response.body)
01412         self.assertEqual(data, {'args': [], 'kwargs': {'path': 'foo'}})
01413 
01414 
01415 @wsgi_safe
01416 class ClearAllCookiesTest(SimpleHandlerTestCase):
01417     class Handler(RequestHandler):
01418         def get(self):
01419             self.clear_all_cookies()
01420             self.write('ok')
01421 
01422     def test_clear_all_cookies(self):
01423         response = self.fetch('/', headers={'Cookie': 'foo=bar; baz=xyzzy'})
01424         set_cookies = sorted(response.headers.get_list('Set-Cookie'))
01425         self.assertTrue(set_cookies[0].startswith('baz=;'))
01426         self.assertTrue(set_cookies[1].startswith('foo=;'))
01427 
01428 
01429 class PermissionError(Exception):
01430     pass
01431 
01432 
01433 @wsgi_safe
01434 class ExceptionHandlerTest(SimpleHandlerTestCase):
01435     class Handler(RequestHandler):
01436         def get(self):
01437             exc = self.get_argument('exc')
01438             if exc == 'http':
01439                 raise HTTPError(410, "no longer here")
01440             elif exc == 'zero':
01441                 1 / 0
01442             elif exc == 'permission':
01443                 raise PermissionError('not allowed')
01444 
01445         def write_error(self, status_code, **kwargs):
01446             if 'exc_info' in kwargs:
01447                 typ, value, tb = kwargs['exc_info']
01448                 if isinstance(value, PermissionError):
01449                     self.set_status(403)
01450                     self.write('PermissionError')
01451                     return
01452             RequestHandler.write_error(self, status_code, **kwargs)
01453 
01454         def log_exception(self, typ, value, tb):
01455             if isinstance(value, PermissionError):
01456                 app_log.warning('custom logging for PermissionError: %s',
01457                                 value.args[0])
01458             else:
01459                 RequestHandler.log_exception(self, typ, value, tb)
01460 
01461     def test_http_error(self):
01462         # HTTPErrors are logged as warnings with no stack trace.
01463         # TODO: extend ExpectLog to test this more precisely
01464         with ExpectLog(gen_log, '.*no longer here'):
01465             response = self.fetch('/?exc=http')
01466             self.assertEqual(response.code, 410)
01467 
01468     def test_unknown_error(self):
01469         # Unknown errors are logged as errors with a stack trace.
01470         with ExpectLog(app_log, 'Uncaught exception'):
01471             response = self.fetch('/?exc=zero')
01472             self.assertEqual(response.code, 500)
01473 
01474     def test_known_error(self):
01475         # log_exception can override logging behavior, and write_error
01476         # can override the response.
01477         with ExpectLog(app_log,
01478                        'custom logging for PermissionError: not allowed'):
01479             response = self.fetch('/?exc=permission')
01480             self.assertEqual(response.code, 403)
01481 
01482 
01483 @wsgi_safe
01484 class UIMethodUIModuleTest(SimpleHandlerTestCase):
01485     """Test that UI methods and modules are created correctly and
01486     associated with the handler.
01487     """
01488     class Handler(RequestHandler):
01489         def get(self):
01490             self.render('foo.html')
01491 
01492         def value(self):
01493             return self.get_argument("value")
01494 
01495     def get_app_kwargs(self):
01496         def my_ui_method(handler, x):
01497             return "In my_ui_method(%s) with handler value %s." % (
01498                 x, handler.value())
01499         class MyModule(UIModule):
01500             def render(self, x):
01501                 return "In MyModule(%s) with handler value %s." % (
01502                     x, self.handler.value())
01503 
01504         loader = DictLoader({
01505             'foo.html': '{{ my_ui_method(42) }} {% module MyModule(123) %}',
01506         })
01507         return dict(template_loader=loader,
01508                     ui_methods={'my_ui_method': my_ui_method},
01509                     ui_modules={'MyModule': MyModule})
01510 
01511     def tearDown(self):
01512         super(UIMethodUIModuleTest, self).tearDown()
01513         # TODO: fix template loader caching so this isn't necessary.
01514         RequestHandler._template_loaders.clear()
01515 
01516     def test_ui_method(self):
01517         response = self.fetch('/?value=asdf')
01518         self.assertEqual(response.body,
01519                          b'In my_ui_method(42) with handler value asdf. '
01520                          b'In MyModule(123) with handler value asdf.')
01521 
01522 
01523 @wsgi_safe
01524 class GetArgumentErrorTest(SimpleHandlerTestCase):
01525     class Handler(RequestHandler):
01526         def get(self):
01527             try:
01528                 self.get_argument('foo')
01529                 self.write({})
01530             except MissingArgumentError as e:
01531                 self.write({'arg_name': e.arg_name,
01532                             'log_message': e.log_message})
01533 
01534     def test_catch_error(self):
01535         response = self.fetch('/')
01536         self.assertEqual(json_decode(response.body),
01537                          {'arg_name': 'foo',
01538                           'log_message': 'Missing argument foo'})
01539 
01540 
01541 class MultipleExceptionTest(SimpleHandlerTestCase):
01542     class Handler(RequestHandler):
01543         exc_count = 0
01544 
01545         @asynchronous
01546         def get(self):
01547             from tornado.ioloop import IOLoop
01548             IOLoop.current().add_callback(lambda: 1 / 0)
01549             IOLoop.current().add_callback(lambda: 1 / 0)
01550 
01551         def log_exception(self, typ, value, tb):
01552             MultipleExceptionTest.Handler.exc_count += 1
01553 
01554     def test_multi_exception(self):
01555         # This test verifies that multiple exceptions raised into the same
01556         # ExceptionStackContext do not generate extraneous log entries
01557         # due to "Cannot send error response after headers written".
01558         # log_exception is called, but it does not proceed to send_error.
01559         response = self.fetch('/')
01560         self.assertEqual(response.code, 500)
01561         response = self.fetch('/')
01562         self.assertEqual(response.code, 500)
01563         # Each of our two requests generated two exceptions, we should have
01564         # seen at least three of them by now (the fourth may still be
01565         # in the queue).
01566         self.assertGreater(MultipleExceptionTest.Handler.exc_count, 2)
01567 
01568 
01569 @wsgi_safe
01570 class SetCurrentUserTest(SimpleHandlerTestCase):
01571     class Handler(RequestHandler):
01572         def prepare(self):
01573             self.current_user = 'Ben'
01574 
01575         def get(self):
01576             self.write('Hello %s' % self.current_user)
01577 
01578     def test_set_current_user(self):
01579         # Ensure that current_user can be assigned to normally for apps
01580         # that want to forgo the lazy get_current_user property
01581         response = self.fetch('/')
01582         self.assertEqual(response.body, b'Hello Ben')
01583 
01584 
01585 @wsgi_safe
01586 class GetCurrentUserTest(WebTestCase):
01587     def get_app_kwargs(self):
01588         class WithoutUserModule(UIModule):
01589             def render(self):
01590                 return ''
01591 
01592         class WithUserModule(UIModule):
01593             def render(self):
01594                 return str(self.current_user)
01595 
01596         loader = DictLoader({
01597             'without_user.html': '',
01598             'with_user.html': '{{ current_user }}',
01599             'without_user_module.html': '{% module WithoutUserModule() %}',
01600             'with_user_module.html': '{% module WithUserModule() %}',
01601         })
01602         return dict(template_loader=loader,
01603                     ui_modules={'WithUserModule': WithUserModule,
01604                                 'WithoutUserModule': WithoutUserModule})
01605 
01606     def tearDown(self):
01607         super(GetCurrentUserTest, self).tearDown()
01608         RequestHandler._template_loaders.clear()
01609 
01610     def get_handlers(self):
01611         class CurrentUserHandler(RequestHandler):
01612             def prepare(self):
01613                 self.has_loaded_current_user = False
01614 
01615             def get_current_user(self):
01616                 self.has_loaded_current_user = True
01617                 return ''
01618 
01619         class WithoutUserHandler(CurrentUserHandler):
01620             def get(self):
01621                 self.render_string('without_user.html')
01622                 self.finish(str(self.has_loaded_current_user))
01623 
01624         class WithUserHandler(CurrentUserHandler):
01625             def get(self):
01626                 self.render_string('with_user.html')
01627                 self.finish(str(self.has_loaded_current_user))
01628 
01629         class CurrentUserModuleHandler(CurrentUserHandler):
01630             def get_template_namespace(self):
01631                 # If RequestHandler.get_template_namespace is called, then
01632                 # get_current_user is evaluated. Until #820 is fixed, this
01633                 # is a small hack to circumvent the issue.
01634                 return self.ui
01635 
01636         class WithoutUserModuleHandler(CurrentUserModuleHandler):
01637             def get(self):
01638                 self.render_string('without_user_module.html')
01639                 self.finish(str(self.has_loaded_current_user))
01640 
01641         class WithUserModuleHandler(CurrentUserModuleHandler):
01642             def get(self):
01643                 self.render_string('with_user_module.html')
01644                 self.finish(str(self.has_loaded_current_user))
01645 
01646         return [('/without_user', WithoutUserHandler),
01647                 ('/with_user', WithUserHandler),
01648                 ('/without_user_module', WithoutUserModuleHandler),
01649                 ('/with_user_module', WithUserModuleHandler)]
01650 
01651     @unittest.skip('needs fix')
01652     def test_get_current_user_is_lazy(self):
01653         # TODO: Make this test pass. See #820.
01654         response = self.fetch('/without_user')
01655         self.assertEqual(response.body, b'False')
01656 
01657     def test_get_current_user_works(self):
01658         response = self.fetch('/with_user')
01659         self.assertEqual(response.body, b'True')
01660 
01661     def test_get_current_user_from_ui_module_is_lazy(self):
01662         response = self.fetch('/without_user_module')
01663         self.assertEqual(response.body, b'False')
01664 
01665     def test_get_current_user_from_ui_module_works(self):
01666         response = self.fetch('/with_user_module')
01667         self.assertEqual(response.body, b'True')
01668 
01669 
01670 @wsgi_safe
01671 class UnimplementedHTTPMethodsTest(SimpleHandlerTestCase):
01672     class Handler(RequestHandler):
01673         pass
01674 
01675     def test_unimplemented_standard_methods(self):
01676         for method in ['HEAD', 'GET', 'DELETE', 'OPTIONS']:
01677             response = self.fetch('/', method=method)
01678             self.assertEqual(response.code, 405)
01679         for method in ['POST', 'PUT']:
01680             response = self.fetch('/', method=method, body=b'')
01681             self.assertEqual(response.code, 405)
01682 
01683 
01684 class UnimplementedNonStandardMethodsTest(SimpleHandlerTestCase):
01685     # wsgiref.validate complains about unknown methods in a way that makes
01686     # this test not wsgi_safe.
01687     class Handler(RequestHandler):
01688         def other(self):
01689             # Even though this method exists, it won't get called automatically
01690             # because it is not in SUPPORTED_METHODS.
01691             self.write('other')
01692 
01693     def test_unimplemented_patch(self):
01694         # PATCH is recently standardized; Tornado supports it by default
01695         # but wsgiref.validate doesn't like it.
01696         response = self.fetch('/', method='PATCH', body=b'')
01697         self.assertEqual(response.code, 405)
01698 
01699     def test_unimplemented_other(self):
01700         response = self.fetch('/', method='OTHER',
01701                               allow_nonstandard_methods=True)
01702         self.assertEqual(response.code, 405)
01703 
01704 
01705 @wsgi_safe
01706 class AllHTTPMethodsTest(SimpleHandlerTestCase):
01707     class Handler(RequestHandler):
01708         def method(self):
01709             self.write(self.request.method)
01710 
01711         get = delete = options = post = put = method
01712 
01713     def test_standard_methods(self):
01714         response = self.fetch('/', method='HEAD')
01715         self.assertEqual(response.body, b'')
01716         for method in ['GET', 'DELETE', 'OPTIONS']:
01717             response = self.fetch('/', method=method)
01718             self.assertEqual(response.body, utf8(method))
01719         for method in ['POST', 'PUT']:
01720             response = self.fetch('/', method=method, body=b'')
01721             self.assertEqual(response.body, utf8(method))
01722 
01723 
01724 class PatchMethodTest(SimpleHandlerTestCase):
01725     class Handler(RequestHandler):
01726         SUPPORTED_METHODS = RequestHandler.SUPPORTED_METHODS + ('OTHER',)
01727 
01728         def patch(self):
01729             self.write('patch')
01730 
01731         def other(self):
01732             self.write('other')
01733 
01734     def test_patch(self):
01735         response = self.fetch('/', method='PATCH', body=b'')
01736         self.assertEqual(response.body, b'patch')
01737 
01738     def test_other(self):
01739         response = self.fetch('/', method='OTHER',
01740                               allow_nonstandard_methods=True)
01741         self.assertEqual(response.body, b'other')
01742 
01743 
01744 @wsgi_safe
01745 class FinishInPrepareTest(SimpleHandlerTestCase):
01746     class Handler(RequestHandler):
01747         def prepare(self):
01748             self.finish('done')
01749 
01750         def get(self):
01751             # It's difficult to assert for certain that a method did not
01752             # or will not be called in an asynchronous context, but this
01753             # will be logged noisily if it is reached.
01754             raise Exception('should not reach this method')
01755 
01756     def test_finish_in_prepare(self):
01757         response = self.fetch('/')
01758         self.assertEqual(response.body, b'done')
01759 
01760 
01761 @wsgi_safe
01762 class Default404Test(WebTestCase):
01763     def get_handlers(self):
01764         # If there are no handlers at all a default redirect handler gets added.
01765         return [('/foo', RequestHandler)]
01766 
01767     def test_404(self):
01768         response = self.fetch('/')
01769         self.assertEqual(response.code, 404)
01770         self.assertEqual(response.body,
01771                          b'<html><title>404: Not Found</title>'
01772                          b'<body>404: Not Found</body></html>')
01773 
01774 
01775 @wsgi_safe
01776 class Custom404Test(WebTestCase):
01777     def get_handlers(self):
01778         return [('/foo', RequestHandler)]
01779 
01780     def get_app_kwargs(self):
01781         class Custom404Handler(RequestHandler):
01782             def get(self):
01783                 self.set_status(404)
01784                 self.write('custom 404 response')
01785 
01786         return dict(default_handler_class=Custom404Handler)
01787 
01788     def test_404(self):
01789         response = self.fetch('/')
01790         self.assertEqual(response.code, 404)
01791         self.assertEqual(response.body, b'custom 404 response')
01792 
01793 
01794 @wsgi_safe
01795 class DefaultHandlerArgumentsTest(WebTestCase):
01796     def get_handlers(self):
01797         return [('/foo', RequestHandler)]
01798 
01799     def get_app_kwargs(self):
01800         return dict(default_handler_class=ErrorHandler,
01801                     default_handler_args=dict(status_code=403))
01802 
01803     def test_403(self):
01804         response = self.fetch('/')
01805         self.assertEqual(response.code, 403)
01806 
01807 
01808 @wsgi_safe
01809 class HandlerByNameTest(WebTestCase):
01810     def get_handlers(self):
01811         # All three are equivalent.
01812         return [('/hello1', HelloHandler),
01813                 ('/hello2', 'tornado.test.web_test.HelloHandler'),
01814                 url('/hello3', 'tornado.test.web_test.HelloHandler'),
01815                 ]
01816 
01817     def test_handler_by_name(self):
01818         resp = self.fetch('/hello1')
01819         self.assertEqual(resp.body, b'hello')
01820         resp = self.fetch('/hello2')
01821         self.assertEqual(resp.body, b'hello')
01822         resp = self.fetch('/hello3')
01823         self.assertEqual(resp.body, b'hello')
01824 
01825 
01826 class StreamingRequestBodyTest(WebTestCase):
01827     def get_handlers(self):
01828         @stream_request_body
01829         class StreamingBodyHandler(RequestHandler):
01830             def initialize(self, test):
01831                 self.test = test
01832 
01833             def prepare(self):
01834                 self.test.prepared.set_result(None)
01835 
01836             def data_received(self, data):
01837                 self.test.data.set_result(data)
01838 
01839             def get(self):
01840                 self.test.finished.set_result(None)
01841                 self.write({})
01842 
01843         @stream_request_body
01844         class EarlyReturnHandler(RequestHandler):
01845             def prepare(self):
01846                 # If we finish the response in prepare, it won't continue to
01847                 # the (non-existent) data_received.
01848                 raise HTTPError(401)
01849 
01850         @stream_request_body
01851         class CloseDetectionHandler(RequestHandler):
01852             def initialize(self, test):
01853                 self.test = test
01854 
01855             def on_connection_close(self):
01856                 super(CloseDetectionHandler, self).on_connection_close()
01857                 self.test.close_future.set_result(None)
01858 
01859         return [('/stream_body', StreamingBodyHandler, dict(test=self)),
01860                 ('/early_return', EarlyReturnHandler),
01861                 ('/close_detection', CloseDetectionHandler, dict(test=self))]
01862 
01863     def connect(self, url, connection_close):
01864         # Use a raw connection so we can control the sending of data.
01865         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
01866         s.connect(("localhost", self.get_http_port()))
01867         stream = IOStream(s, io_loop=self.io_loop)
01868         stream.write(b"GET " + url + b" HTTP/1.1\r\n")
01869         if connection_close:
01870             stream.write(b"Connection: close\r\n")
01871         stream.write(b"Transfer-Encoding: chunked\r\n\r\n")
01872         return stream
01873 
01874     @gen_test
01875     def test_streaming_body(self):
01876         self.prepared = Future()
01877         self.data = Future()
01878         self.finished = Future()
01879 
01880         stream = self.connect(b"/stream_body", connection_close=True)
01881         yield self.prepared
01882         stream.write(b"4\r\nasdf\r\n")
01883         # Ensure the first chunk is received before we send the second.
01884         data = yield self.data
01885         self.assertEqual(data, b"asdf")
01886         self.data = Future()
01887         stream.write(b"4\r\nqwer\r\n")
01888         data = yield self.data
01889         self.assertEquals(data, b"qwer")
01890         stream.write(b"0\r\n")
01891         yield self.finished
01892         data = yield gen.Task(stream.read_until_close)
01893         # This would ideally use an HTTP1Connection to read the response.
01894         self.assertTrue(data.endswith(b"{}"))
01895         stream.close()
01896 
01897     @gen_test
01898     def test_early_return(self):
01899         stream = self.connect(b"/early_return", connection_close=False)
01900         data = yield gen.Task(stream.read_until_close)
01901         self.assertTrue(data.startswith(b"HTTP/1.1 401"))
01902 
01903     @gen_test
01904     def test_early_return_with_data(self):
01905         stream = self.connect(b"/early_return", connection_close=False)
01906         stream.write(b"4\r\nasdf\r\n")
01907         data = yield gen.Task(stream.read_until_close)
01908         self.assertTrue(data.startswith(b"HTTP/1.1 401"))
01909 
01910     @gen_test
01911     def test_close_during_upload(self):
01912         self.close_future = Future()
01913         stream = self.connect(b"/close_detection", connection_close=False)
01914         stream.close()
01915         yield self.close_future
01916 
01917 
01918 class StreamingRequestFlowControlTest(WebTestCase):
01919     def get_handlers(self):
01920         from tornado.ioloop import IOLoop
01921 
01922         # Each method in this handler returns a Future and yields to the
01923         # IOLoop so the future is not immediately ready.  Ensure that the
01924         # Futures are respected and no method is called before the previous
01925         # one has completed.
01926         @stream_request_body
01927         class FlowControlHandler(RequestHandler):
01928             def initialize(self, test):
01929                 self.test = test
01930                 self.method = None
01931                 self.methods = []
01932 
01933             @contextlib.contextmanager
01934             def in_method(self, method):
01935                 if self.method is not None:
01936                     self.test.fail("entered method %s while in %s" %
01937                                    (method, self.method))
01938                 self.method = method
01939                 self.methods.append(method)
01940                 try:
01941                     yield
01942                 finally:
01943                     self.method = None
01944 
01945             @gen.coroutine
01946             def prepare(self):
01947                 with self.in_method('prepare'):
01948                     yield gen.Task(IOLoop.current().add_callback)
01949 
01950             @gen.coroutine
01951             def data_received(self, data):
01952                 with self.in_method('data_received'):
01953                     yield gen.Task(IOLoop.current().add_callback)
01954 
01955             @gen.coroutine
01956             def post(self):
01957                 with self.in_method('post'):
01958                     yield gen.Task(IOLoop.current().add_callback)
01959                 self.write(dict(methods=self.methods))
01960 
01961         return [('/', FlowControlHandler, dict(test=self))]
01962 
01963     def get_httpserver_options(self):
01964         # Use a small chunk size so flow control is relevant even though
01965         # all the data arrives at once.
01966         return dict(chunk_size=10)
01967 
01968     def test_flow_control(self):
01969         response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz',
01970                               method='POST')
01971         response.rethrow()
01972         self.assertEqual(json_decode(response.body),
01973                          dict(methods=['prepare', 'data_received',
01974                                        'data_received', 'data_received',
01975                                        'post']))
01976 
01977 
01978 @wsgi_safe
01979 class IncorrectContentLengthTest(SimpleHandlerTestCase):
01980     def get_handlers(self):
01981         test = self
01982         self.server_error = None
01983 
01984         # Manually set a content-length that doesn't match the actual content.
01985         class TooHigh(RequestHandler):
01986             def get(self):
01987                 self.set_header("Content-Length", "42")
01988                 try:
01989                     self.finish("ok")
01990                 except Exception as e:
01991                     test.server_error = e
01992                     raise
01993 
01994         class TooLow(RequestHandler):
01995             def get(self):
01996                 self.set_header("Content-Length", "2")
01997                 try:
01998                     self.finish("hello")
01999                 except Exception as e:
02000                     test.server_error = e
02001                     raise
02002 
02003         return [('/high', TooHigh),
02004                 ('/low', TooLow)]
02005 
02006     def test_content_length_too_high(self):
02007         # When the content-length is too high, the connection is simply
02008         # closed without completing the response.  An error is logged on
02009         # the server.
02010         with ExpectLog(app_log, "Uncaught exception"):
02011             with ExpectLog(gen_log,
02012                            "Cannot send error response after headers written"):
02013                 response = self.fetch("/high")
02014         self.assertEqual(response.code, 599)
02015         self.assertEqual(str(self.server_error),
02016                          "Tried to write 40 bytes less than Content-Length")
02017 
02018     def test_content_length_too_low(self):
02019         # When the content-length is too low, the connection is closed
02020         # without writing the last chunk, so the client never sees the request
02021         # complete (which would be a framing error).
02022         with ExpectLog(app_log, "Uncaught exception"):
02023             with ExpectLog(gen_log,
02024                            "Cannot send error response after headers written"):
02025                 response = self.fetch("/low")
02026         self.assertEqual(response.code, 599)
02027         self.assertEqual(str(self.server_error),
02028                          "Tried to write more data than Content-Length")
02029 
02030 
02031 class ClientCloseTest(SimpleHandlerTestCase):
02032     class Handler(RequestHandler):
02033         def get(self):
02034             # Simulate a connection closed by the client during
02035             # request processing.  The client will see an error, but the
02036             # server should respond gracefully (without logging errors
02037             # because we were unable to write out as many bytes as
02038             # Content-Length said we would)
02039             self.request.connection.stream.close()
02040             self.write('hello')
02041 
02042     def test_client_close(self):
02043         response = self.fetch('/')
02044         self.assertEqual(response.code, 599)
02045 
02046 
02047 class SignedValueTest(unittest.TestCase):
02048     SECRET = "It's a secret to everybody"
02049 
02050     def past(self):
02051         return self.present() - 86400 * 32
02052 
02053     def present(self):
02054         return 1300000000
02055 
02056     def test_known_values(self):
02057         signed_v1 = create_signed_value(SignedValueTest.SECRET, "key", "value",
02058                                         version=1, clock=self.present)
02059         self.assertEqual(
02060             signed_v1,
02061             b"dmFsdWU=|1300000000|31c934969f53e48164c50768b40cbd7e2daaaa4f")
02062 
02063         signed_v2 = create_signed_value(SignedValueTest.SECRET, "key", "value",
02064                                         version=2, clock=self.present)
02065         self.assertEqual(
02066             signed_v2,
02067             b"2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
02068             b"3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152")
02069 
02070         signed_default = create_signed_value(SignedValueTest.SECRET,
02071                                              "key", "value", clock=self.present)
02072         self.assertEqual(signed_default, signed_v2)
02073 
02074         decoded_v1 = decode_signed_value(SignedValueTest.SECRET, "key",
02075                                          signed_v1, min_version=1,
02076                                          clock=self.present)
02077         self.assertEqual(decoded_v1, b"value")
02078 
02079         decoded_v2 = decode_signed_value(SignedValueTest.SECRET, "key",
02080                                          signed_v2, min_version=2,
02081                                          clock=self.present)
02082         self.assertEqual(decoded_v2, b"value")
02083 
02084     def test_name_swap(self):
02085         signed1 = create_signed_value(SignedValueTest.SECRET, "key1", "value",
02086                                       clock=self.present)
02087         signed2 = create_signed_value(SignedValueTest.SECRET, "key2", "value",
02088                                       clock=self.present)
02089         # Try decoding each string with the other's "name"
02090         decoded1 = decode_signed_value(SignedValueTest.SECRET, "key2", signed1,
02091                                        clock=self.present)
02092         self.assertIs(decoded1, None)
02093         decoded2 = decode_signed_value(SignedValueTest.SECRET, "key1", signed2,
02094                                        clock=self.present)
02095         self.assertIs(decoded2, None)
02096 
02097     def test_expired(self):
02098         signed = create_signed_value(SignedValueTest.SECRET, "key1", "value",
02099                                      clock=self.past)
02100         decoded_past = decode_signed_value(SignedValueTest.SECRET, "key1",
02101                                            signed, clock=self.past)
02102         self.assertEqual(decoded_past, b"value")
02103         decoded_present = decode_signed_value(SignedValueTest.SECRET, "key1",
02104                                               signed, clock=self.present)
02105         self.assertIs(decoded_present, None)
02106 
02107     def test_payload_tampering(self):
02108         # These cookies are variants of the one in test_known_values.
02109         sig = "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"
02110         def validate(prefix):
02111             return (b'value' ==
02112                     decode_signed_value(SignedValueTest.SECRET, "key",
02113                                         prefix + sig, clock=self.present))
02114         self.assertTrue(validate("2|1:0|10:1300000000|3:key|8:dmFsdWU=|"))
02115         # Change key version
02116         self.assertFalse(validate("2|1:1|10:1300000000|3:key|8:dmFsdWU=|"))
02117         # length mismatch (field too short)
02118         self.assertFalse(validate("2|1:0|10:130000000|3:key|8:dmFsdWU=|"))
02119         # length mismatch (field too long)
02120         self.assertFalse(validate("2|1:0|10:1300000000|3:keey|8:dmFsdWU=|"))
02121 
02122     def test_signature_tampering(self):
02123         prefix = "2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
02124         def validate(sig):
02125             return (b'value' ==
02126                     decode_signed_value(SignedValueTest.SECRET, "key",
02127                                         prefix + sig, clock=self.present))
02128         self.assertTrue(validate(
02129             "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
02130         # All zeros
02131         self.assertFalse(validate("0" * 32))
02132         # Change one character
02133         self.assertFalse(validate(
02134             "4d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
02135         # Change another character
02136         self.assertFalse(validate(
02137             "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e153"))
02138         # Truncate
02139         self.assertFalse(validate(
02140             "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e15"))
02141         # Lengthen
02142         self.assertFalse(validate(
02143             "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e1538"))
02144 
02145     def test_non_ascii(self):
02146         value = b"\xe9"
02147         signed = create_signed_value(SignedValueTest.SECRET, "key", value,
02148                                      clock=self.present)
02149         decoded = decode_signed_value(SignedValueTest.SECRET, "key", signed,
02150                                       clock=self.present)
02151         self.assertEqual(value, decoded)
02152 
02153 
02154 @wsgi_safe
02155 class XSRFTest(SimpleHandlerTestCase):
02156     class Handler(RequestHandler):
02157         def get(self):
02158             version = int(self.get_argument("version", "2"))
02159             # This would be a bad idea in a real app, but in this test
02160             # it's fine.
02161             self.settings["xsrf_cookie_version"] = version
02162             self.write(self.xsrf_token)
02163 
02164         def post(self):
02165             self.write("ok")
02166 
02167     def get_app_kwargs(self):
02168         return dict(xsrf_cookies=True)
02169 
02170     def setUp(self):
02171         super(XSRFTest, self).setUp()
02172         self.xsrf_token = self.get_token()
02173 
02174     def get_token(self, old_token=None, version=None):
02175         if old_token is not None:
02176             headers = self.cookie_headers(old_token)
02177         else:
02178             headers = None
02179         response = self.fetch(
02180             "/" if version is None else ("/?version=%d" % version),
02181             headers=headers)
02182         response.rethrow()
02183         return native_str(response.body)
02184 
02185     def cookie_headers(self, token=None):
02186         if token is None:
02187             token = self.xsrf_token
02188         return {"Cookie": "_xsrf=" + token}
02189 
02190     def test_xsrf_fail_no_token(self):
02191         with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
02192             response = self.fetch("/", method="POST", body=b"")
02193         self.assertEqual(response.code, 403)
02194 
02195     def test_xsrf_fail_body_no_cookie(self):
02196         with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
02197             response = self.fetch(
02198                 "/", method="POST",
02199                 body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)))
02200         self.assertEqual(response.code, 403)
02201 
02202     def test_xsrf_fail_cookie_no_body(self):
02203         with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
02204             response = self.fetch(
02205                 "/", method="POST", body=b"",
02206                 headers=self.cookie_headers())
02207         self.assertEqual(response.code, 403)
02208 
02209     def test_xsrf_success_short_token(self):
02210         response = self.fetch(
02211             "/", method="POST",
02212             body=urllib_parse.urlencode(dict(_xsrf='deadbeef')),
02213             headers=self.cookie_headers(token='deadbeef'))
02214         self.assertEqual(response.code, 200)
02215 
02216     def test_xsrf_success_non_hex_token(self):
02217         response = self.fetch(
02218             "/", method="POST",
02219             body=urllib_parse.urlencode(dict(_xsrf='xoxo')),
02220             headers=self.cookie_headers(token='xoxo'))
02221         self.assertEqual(response.code, 200)
02222 
02223     def test_xsrf_success_post_body(self):
02224         response = self.fetch(
02225             "/", method="POST",
02226             body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
02227             headers=self.cookie_headers())
02228         self.assertEqual(response.code, 200)
02229 
02230     def test_xsrf_success_query_string(self):
02231         response = self.fetch(
02232             "/?" + urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
02233             method="POST", body=b"",
02234             headers=self.cookie_headers())
02235         self.assertEqual(response.code, 200)
02236 
02237     def test_xsrf_success_header(self):
02238         response = self.fetch("/", method="POST", body=b"",
02239                               headers=dict({"X-Xsrftoken": self.xsrf_token},
02240                                            **self.cookie_headers()))
02241         self.assertEqual(response.code, 200)
02242 
02243     def test_distinct_tokens(self):
02244         # Every request gets a distinct token.
02245         NUM_TOKENS = 10
02246         tokens = set()
02247         for i in range(NUM_TOKENS):
02248             tokens.add(self.get_token())
02249         self.assertEqual(len(tokens), NUM_TOKENS)
02250 
02251     def test_cross_user(self):
02252         token2 = self.get_token()
02253         # Each token can be used to authenticate its own request.
02254         for token in (self.xsrf_token, token2):
02255             response  = self.fetch(
02256                 "/", method="POST",
02257                 body=urllib_parse.urlencode(dict(_xsrf=token)),
02258                 headers=self.cookie_headers(token))
02259             self.assertEqual(response.code, 200)
02260         # Sending one in the cookie and the other in the body is not allowed.
02261         for cookie_token, body_token in ((self.xsrf_token, token2),
02262                                          (token2, self.xsrf_token)):
02263             with ExpectLog(gen_log, '.*XSRF cookie does not match POST'):
02264                 response = self.fetch(
02265                     "/", method="POST",
02266                     body=urllib_parse.urlencode(dict(_xsrf=body_token)),
02267                     headers=self.cookie_headers(cookie_token))
02268             self.assertEqual(response.code, 403)
02269 
02270     def test_refresh_token(self):
02271         token = self.xsrf_token
02272         tokens_seen = set([token])
02273         # A user's token is stable over time.  Refreshing the page in one tab
02274         # might update the cookie while an older tab still has the old cookie
02275         # in its DOM.  Simulate this scenario by passing a constant token
02276         # in the body and re-querying for the token.
02277         for i in range(5):
02278             token = self.get_token(token)
02279             # Tokens are encoded uniquely each time
02280             tokens_seen.add(token)
02281             response = self.fetch(
02282                 "/", method="POST",
02283                 body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
02284                 headers=self.cookie_headers(token))
02285             self.assertEqual(response.code, 200)
02286         self.assertEqual(len(tokens_seen), 6)
02287 
02288     def test_versioning(self):
02289         # Version 1 still produces distinct tokens per request.
02290         self.assertNotEqual(self.get_token(version=1),
02291                             self.get_token(version=1))
02292 
02293         # Refreshed v1 tokens are all identical.
02294         v1_token = self.get_token(version=1)
02295         for i in range(5):
02296             self.assertEqual(self.get_token(v1_token, version=1), v1_token)
02297 
02298         # Upgrade to a v2 version of the same token
02299         v2_token = self.get_token(v1_token)
02300         self.assertNotEqual(v1_token, v2_token)
02301         # Each v1 token can map to many v2 tokens.
02302         self.assertNotEqual(v2_token, self.get_token(v1_token))
02303 
02304         # The tokens are cross-compatible.
02305         for cookie_token, body_token in ((v1_token, v2_token),
02306                                          (v2_token, v1_token)):
02307             response = self.fetch(
02308                 "/", method="POST",
02309                 body=urllib_parse.urlencode(dict(_xsrf=body_token)),
02310                 headers=self.cookie_headers(cookie_token))
02311             self.assertEqual(response.code, 200)
02312 
02313 
02314 @wsgi_safe
02315 class FinishExceptionTest(SimpleHandlerTestCase):
02316     class Handler(RequestHandler):
02317         def get(self):
02318             self.set_status(401)
02319             self.set_header('WWW-Authenticate', 'Basic realm="something"')
02320             self.write('authentication required')
02321             raise Finish()
02322 
02323     def test_finish_exception(self):
02324         response = self.fetch('/')
02325         self.assertEqual(response.code, 401)
02326         self.assertEqual('Basic realm="something"',
02327                          response.headers.get('WWW-Authenticate'))
02328         self.assertEqual(b'authentication required', response.body)


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:51