00001 from __future__ import absolute_import, division, print_function, with_statement
00002 from tornado.concurrent import Future
00003 from tornado import gen
00004 from tornado.escape import json_decode, utf8, to_unicode, recursive_unicode, native_str, to_basestring
00005 from tornado.httputil import format_timestamp
00006 from tornado.iostream import IOStream
00007 from tornado.log import app_log, gen_log
00008 from tornado.simple_httpclient import SimpleAsyncHTTPClient
00009 from tornado.template import DictLoader
00010 from tornado.testing import AsyncHTTPTestCase, ExpectLog, gen_test
00011 from tornado.test.util import unittest
00012 from tornado.util import u, bytes_type, ObjectDict, unicode_type
00013 from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature_v1, create_signed_value, decode_signed_value, ErrorHandler, UIModule, MissingArgumentError, stream_request_body, Finish
00014
00015 import binascii
00016 import contextlib
00017 import datetime
00018 import email.utils
00019 import itertools
00020 import logging
00021 import os
00022 import re
00023 import socket
00024 import sys
00025
00026 try:
00027 import urllib.parse as urllib_parse
00028 except ImportError:
00029 import urllib as urllib_parse
00030
00031 wsgi_safe_tests = []
00032
00033 relpath = lambda *a: os.path.join(os.path.dirname(__file__), *a)
00034
00035
00036 def wsgi_safe(cls):
00037 wsgi_safe_tests.append(cls)
00038 return cls
00039
00040
00041 class WebTestCase(AsyncHTTPTestCase):
00042 """Base class for web tests that also supports WSGI mode.
00043
00044 Override get_handlers and get_app_kwargs instead of get_app.
00045 Append to wsgi_safe to have it run in wsgi_test as well.
00046 """
00047 def get_app(self):
00048 self.app = Application(self.get_handlers(), **self.get_app_kwargs())
00049 return self.app
00050
00051 def get_handlers(self):
00052 raise NotImplementedError()
00053
00054 def get_app_kwargs(self):
00055 return {}
00056
00057
00058 class SimpleHandlerTestCase(WebTestCase):
00059 """Simplified base class for tests that work with a single handler class.
00060
00061 To use, define a nested class named ``Handler``.
00062 """
00063 def get_handlers(self):
00064 return [('/', self.Handler)]
00065
00066
00067 class HelloHandler(RequestHandler):
00068 def get(self):
00069 self.write('hello')
00070
00071
00072 class CookieTestRequestHandler(RequestHandler):
00073
00074 def __init__(self):
00075
00076 self._cookies = {}
00077 self.application = ObjectDict(settings=dict(cookie_secret='0123456789'))
00078
00079 def get_cookie(self, name):
00080 return self._cookies.get(name)
00081
00082 def set_cookie(self, name, value, expires_days=None):
00083 self._cookies[name] = value
00084
00085
00086
00087 class SecureCookieV1Test(unittest.TestCase):
00088 def test_round_trip(self):
00089 handler = CookieTestRequestHandler()
00090 handler.set_secure_cookie('foo', b'bar', version=1)
00091 self.assertEqual(handler.get_secure_cookie('foo', min_version=1),
00092 b'bar')
00093
00094 def test_cookie_tampering_future_timestamp(self):
00095 handler = CookieTestRequestHandler()
00096
00097 handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'),
00098 version=1)
00099 cookie = handler._cookies['foo']
00100 match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie)
00101 self.assertTrue(match)
00102 timestamp = match.group(1)
00103 sig = match.group(2)
00104 self.assertEqual(
00105 _create_signature_v1(handler.application.settings["cookie_secret"],
00106 'foo', '12345678', timestamp),
00107 sig)
00108
00109
00110
00111 self.assertEqual(
00112 _create_signature_v1(handler.application.settings["cookie_secret"],
00113 'foo', '1234', b'5678' + timestamp),
00114 sig)
00115
00116 handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
00117 to_basestring(timestamp), to_basestring(sig)))
00118
00119 with ExpectLog(gen_log, "Cookie timestamp in future"):
00120 self.assertTrue(
00121 handler.get_secure_cookie('foo', min_version=1) is None)
00122
00123 def test_arbitrary_bytes(self):
00124
00125
00126 handler = CookieTestRequestHandler()
00127 handler.set_secure_cookie('foo', b'\xe9', version=1)
00128 self.assertEqual(handler.get_secure_cookie('foo', min_version=1), b'\xe9')
00129
00130
00131 class CookieTest(WebTestCase):
00132 def get_handlers(self):
00133 class SetCookieHandler(RequestHandler):
00134 def get(self):
00135
00136
00137 self.set_cookie("str", "asdf")
00138 self.set_cookie("unicode", u("qwer"))
00139 self.set_cookie("bytes", b"zxcv")
00140
00141 class GetCookieHandler(RequestHandler):
00142 def get(self):
00143 self.write(self.get_cookie("foo", "default"))
00144
00145 class SetCookieDomainHandler(RequestHandler):
00146 def get(self):
00147
00148
00149 self.set_cookie("unicode_args", "blah", domain=u("foo.com"),
00150 path=u("/foo"))
00151
00152 class SetCookieSpecialCharHandler(RequestHandler):
00153 def get(self):
00154 self.set_cookie("equals", "a=b")
00155 self.set_cookie("semicolon", "a;b")
00156 self.set_cookie("quote", 'a"b')
00157
00158 class SetCookieOverwriteHandler(RequestHandler):
00159 def get(self):
00160 self.set_cookie("a", "b", domain="example.com")
00161 self.set_cookie("c", "d", domain="example.com")
00162
00163
00164 self.set_cookie("a", "e")
00165
00166 return [("/set", SetCookieHandler),
00167 ("/get", GetCookieHandler),
00168 ("/set_domain", SetCookieDomainHandler),
00169 ("/special_char", SetCookieSpecialCharHandler),
00170 ("/set_overwrite", SetCookieOverwriteHandler),
00171 ]
00172
00173 def test_set_cookie(self):
00174 response = self.fetch("/set")
00175 self.assertEqual(sorted(response.headers.get_list("Set-Cookie")),
00176 ["bytes=zxcv; Path=/",
00177 "str=asdf; Path=/",
00178 "unicode=qwer; Path=/",
00179 ])
00180
00181 def test_get_cookie(self):
00182 response = self.fetch("/get", headers={"Cookie": "foo=bar"})
00183 self.assertEqual(response.body, b"bar")
00184
00185 response = self.fetch("/get", headers={"Cookie": 'foo="bar"'})
00186 self.assertEqual(response.body, b"bar")
00187
00188 response = self.fetch("/get", headers={"Cookie": "/=exception;"})
00189 self.assertEqual(response.body, b"default")
00190
00191 def test_set_cookie_domain(self):
00192 response = self.fetch("/set_domain")
00193 self.assertEqual(response.headers.get_list("Set-Cookie"),
00194 ["unicode_args=blah; Domain=foo.com; Path=/foo"])
00195
00196 def test_cookie_special_char(self):
00197 response = self.fetch("/special_char")
00198 headers = sorted(response.headers.get_list("Set-Cookie"))
00199 self.assertEqual(len(headers), 3)
00200 self.assertEqual(headers[0], 'equals="a=b"; Path=/')
00201 self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
00202
00203 self.assertTrue(headers[2] in ('semicolon="a;b"; Path=/',
00204 'semicolon="a\\073b"; Path=/'),
00205 headers[2])
00206
00207 data = [('foo=a=b', 'a=b'),
00208 ('foo="a=b"', 'a=b'),
00209 ('foo="a;b"', 'a;b'),
00210
00211 ('foo="a\\073b"', 'a;b'),
00212 ('foo="a\\"b"', 'a"b'),
00213 ]
00214 for header, expected in data:
00215 logging.debug("trying %r", header)
00216 response = self.fetch("/get", headers={"Cookie": header})
00217 self.assertEqual(response.body, utf8(expected))
00218
00219 def test_set_cookie_overwrite(self):
00220 response = self.fetch("/set_overwrite")
00221 headers = response.headers.get_list("Set-Cookie")
00222 self.assertEqual(sorted(headers),
00223 ["a=e; Path=/", "c=d; Domain=example.com; Path=/"])
00224
00225
00226 class AuthRedirectRequestHandler(RequestHandler):
00227 def initialize(self, login_url):
00228 self.login_url = login_url
00229
00230 def get_login_url(self):
00231 return self.login_url
00232
00233 @authenticated
00234 def get(self):
00235
00236 self.send_error(500)
00237
00238
00239 class AuthRedirectTest(WebTestCase):
00240 def get_handlers(self):
00241 return [('/relative', AuthRedirectRequestHandler,
00242 dict(login_url='/login')),
00243 ('/absolute', AuthRedirectRequestHandler,
00244 dict(login_url='http://example.com/login'))]
00245
00246 def test_relative_auth_redirect(self):
00247 self.http_client.fetch(self.get_url('/relative'), self.stop,
00248 follow_redirects=False)
00249 response = self.wait()
00250 self.assertEqual(response.code, 302)
00251 self.assertEqual(response.headers['Location'], '/login?next=%2Frelative')
00252
00253 def test_absolute_auth_redirect(self):
00254 self.http_client.fetch(self.get_url('/absolute'), self.stop,
00255 follow_redirects=False)
00256 response = self.wait()
00257 self.assertEqual(response.code, 302)
00258 self.assertTrue(re.match(
00259 'http://example.com/login\?next=http%3A%2F%2Flocalhost%3A[0-9]+%2Fabsolute',
00260 response.headers['Location']), response.headers['Location'])
00261
00262
00263 class ConnectionCloseHandler(RequestHandler):
00264 def initialize(self, test):
00265 self.test = test
00266
00267 @asynchronous
00268 def get(self):
00269 self.test.on_handler_waiting()
00270
00271 def on_connection_close(self):
00272 self.test.on_connection_close()
00273
00274
00275 class ConnectionCloseTest(WebTestCase):
00276 def get_handlers(self):
00277 return [('/', ConnectionCloseHandler, dict(test=self))]
00278
00279 def test_connection_close(self):
00280 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00281 s.connect(("localhost", self.get_http_port()))
00282 self.stream = IOStream(s, io_loop=self.io_loop)
00283 self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
00284 self.wait()
00285
00286 def on_handler_waiting(self):
00287 logging.debug('handler waiting')
00288 self.stream.close()
00289
00290 def on_connection_close(self):
00291 logging.debug('connection closed')
00292 self.stop()
00293
00294
00295 class EchoHandler(RequestHandler):
00296 def get(self, *path_args):
00297
00298
00299
00300
00301 for key in self.request.arguments:
00302 if type(key) != str:
00303 raise Exception("incorrect type for key: %r" % type(key))
00304 for value in self.request.arguments[key]:
00305 if type(value) != bytes_type:
00306 raise Exception("incorrect type for value: %r" %
00307 type(value))
00308 for value in self.get_arguments(key):
00309 if type(value) != unicode_type:
00310 raise Exception("incorrect type for value: %r" %
00311 type(value))
00312 for arg in path_args:
00313 if type(arg) != unicode_type:
00314 raise Exception("incorrect type for path arg: %r" % type(arg))
00315 self.write(dict(path=self.request.path,
00316 path_args=path_args,
00317 args=recursive_unicode(self.request.arguments)))
00318
00319
00320 class RequestEncodingTest(WebTestCase):
00321 def get_handlers(self):
00322 return [("/group/(.*)", EchoHandler),
00323 ("/slashes/([^/]*)/([^/]*)", EchoHandler),
00324 ]
00325
00326 def fetch_json(self, path):
00327 return json_decode(self.fetch(path).body)
00328
00329 def test_group_question_mark(self):
00330
00331 self.assertEqual(self.fetch_json('/group/%3F'),
00332 dict(path='/group/%3F', path_args=['?'], args={}))
00333 self.assertEqual(self.fetch_json('/group/%3F?%3F=%3F'),
00334 dict(path='/group/%3F', path_args=['?'], args={'?': ['?']}))
00335
00336 def test_group_encoding(self):
00337
00338 self.assertEqual(self.fetch_json('/group/%C3%A9?arg=%C3%A9'),
00339 {u("path"): u("/group/%C3%A9"),
00340 u("path_args"): [u("\u00e9")],
00341 u("args"): {u("arg"): [u("\u00e9")]}})
00342
00343 def test_slashes(self):
00344
00345
00346 self.assertEqual(self.fetch_json('/slashes/foo/bar'),
00347 dict(path="/slashes/foo/bar",
00348 path_args=["foo", "bar"],
00349 args={}))
00350 self.assertEqual(self.fetch_json('/slashes/a%2Fb/c%2Fd'),
00351 dict(path="/slashes/a%2Fb/c%2Fd",
00352 path_args=["a/b", "c/d"],
00353 args={}))
00354
00355
00356 class TypeCheckHandler(RequestHandler):
00357 def prepare(self):
00358 self.errors = {}
00359
00360 self.check_type('status', self.get_status(), int)
00361
00362
00363
00364 self.check_type('argument', self.get_argument('foo'), unicode_type)
00365 self.check_type('cookie_key', list(self.cookies.keys())[0], str)
00366 self.check_type('cookie_value', list(self.cookies.values())[0].value, str)
00367
00368
00369
00370 if list(self.cookies.keys()) != ['asdf']:
00371 raise Exception("unexpected values for cookie keys: %r" %
00372 self.cookies.keys())
00373 self.check_type('get_secure_cookie', self.get_secure_cookie('asdf'), bytes_type)
00374 self.check_type('get_cookie', self.get_cookie('asdf'), str)
00375
00376 self.check_type('xsrf_token', self.xsrf_token, bytes_type)
00377 self.check_type('xsrf_form_html', self.xsrf_form_html(), str)
00378
00379 self.check_type('reverse_url', self.reverse_url('typecheck', 'foo'), str)
00380
00381 self.check_type('request_summary', self._request_summary(), str)
00382
00383 def get(self, path_component):
00384
00385
00386 self.check_type('path_component', path_component, unicode_type)
00387 self.write(self.errors)
00388
00389 def post(self, path_component):
00390 self.check_type('path_component', path_component, unicode_type)
00391 self.write(self.errors)
00392
00393 def check_type(self, name, obj, expected_type):
00394 actual_type = type(obj)
00395 if expected_type != actual_type:
00396 self.errors[name] = "expected %s, got %s" % (expected_type,
00397 actual_type)
00398
00399
00400 class DecodeArgHandler(RequestHandler):
00401 def decode_argument(self, value, name=None):
00402 if type(value) != bytes_type:
00403 raise Exception("unexpected type for value: %r" % type(value))
00404
00405 if 'encoding' in self.request.arguments:
00406 return value.decode(to_unicode(self.request.arguments['encoding'][0]))
00407 else:
00408 return value
00409
00410 def get(self, arg):
00411 def describe(s):
00412 if type(s) == bytes_type:
00413 return ["bytes", native_str(binascii.b2a_hex(s))]
00414 elif type(s) == unicode_type:
00415 return ["unicode", s]
00416 raise Exception("unknown type")
00417 self.write({'path': describe(arg),
00418 'query': describe(self.get_argument("foo")),
00419 })
00420
00421
00422 class LinkifyHandler(RequestHandler):
00423 def get(self):
00424 self.render("linkify.html", message="http://example.com")
00425
00426
00427 class UIModuleResourceHandler(RequestHandler):
00428 def get(self):
00429 self.render("page.html", entries=[1, 2])
00430
00431
00432 class OptionalPathHandler(RequestHandler):
00433 def get(self, path):
00434 self.write({"path": path})
00435
00436
00437 class FlowControlHandler(RequestHandler):
00438
00439
00440 @asynchronous
00441 def get(self):
00442 self.write("1")
00443 self.flush(callback=self.step2)
00444
00445 def step2(self):
00446 self.write("2")
00447 self.flush(callback=self.step3)
00448
00449 def step3(self):
00450 self.write("3")
00451 self.finish()
00452
00453
00454 class MultiHeaderHandler(RequestHandler):
00455 def get(self):
00456 self.set_header("x-overwrite", "1")
00457 self.set_header("X-Overwrite", 2)
00458 self.add_header("x-multi", 3)
00459 self.add_header("X-Multi", "4")
00460
00461
00462 class RedirectHandler(RequestHandler):
00463 def get(self):
00464 if self.get_argument('permanent', None) is not None:
00465 self.redirect('/', permanent=int(self.get_argument('permanent')))
00466 elif self.get_argument('status', None) is not None:
00467 self.redirect('/', status=int(self.get_argument('status')))
00468 else:
00469 raise Exception("didn't get permanent or status arguments")
00470
00471
00472 class EmptyFlushCallbackHandler(RequestHandler):
00473 @gen.engine
00474 @asynchronous
00475 def get(self):
00476
00477
00478
00479 yield gen.Task(self.flush)
00480 yield gen.Task(self.flush)
00481 self.write("o")
00482 yield self.flush()
00483 yield self.flush()
00484 self.finish("k")
00485
00486
00487 class HeaderInjectionHandler(RequestHandler):
00488 def get(self):
00489 try:
00490 self.set_header("X-Foo", "foo\r\nX-Bar: baz")
00491 raise Exception("Didn't get expected exception")
00492 except ValueError as e:
00493 if "Unsafe header value" in str(e):
00494 self.finish(b"ok")
00495 else:
00496 raise
00497
00498
00499 class GetArgumentHandler(RequestHandler):
00500 def prepare(self):
00501 if self.get_argument('source', None) == 'query':
00502 method = self.get_query_argument
00503 elif self.get_argument('source', None) == 'body':
00504 method = self.get_body_argument
00505 else:
00506 method = self.get_argument
00507 self.finish(method("foo", "default"))
00508
00509
00510 class GetArgumentsHandler(RequestHandler):
00511 def prepare(self):
00512 self.finish(dict(default=self.get_arguments("foo"),
00513 query=self.get_query_arguments("foo"),
00514 body=self.get_body_arguments("foo")))
00515
00516
00517
00518 @wsgi_safe
00519 class WSGISafeWebTest(WebTestCase):
00520 COOKIE_SECRET = "WebTest.COOKIE_SECRET"
00521
00522 def get_app_kwargs(self):
00523 loader = DictLoader({
00524 "linkify.html": "{% module linkify(message) %}",
00525 "page.html": """\
00526 <html><head></head><body>
00527 {% for e in entries %}
00528 {% module Template("entry.html", entry=e) %}
00529 {% end %}
00530 </body></html>""",
00531 "entry.html": """\
00532 {{ set_resources(embedded_css=".entry { margin-bottom: 1em; }", embedded_javascript="js_embed()", css_files=["/base.css", "/foo.css"], javascript_files="/common.js", html_head="<meta>", html_body='<script src="/analytics.js"/>') }}
00533 <div class="entry">...</div>""",
00534 })
00535 return dict(template_loader=loader,
00536 autoescape="xhtml_escape",
00537 cookie_secret=self.COOKIE_SECRET)
00538
00539 def tearDown(self):
00540 super(WSGISafeWebTest, self).tearDown()
00541 RequestHandler._template_loaders.clear()
00542
00543 def get_handlers(self):
00544 urls = [
00545 url("/typecheck/(.*)", TypeCheckHandler, name='typecheck'),
00546 url("/decode_arg/(.*)", DecodeArgHandler, name='decode_arg'),
00547 url("/decode_arg_kw/(?P<arg>.*)", DecodeArgHandler),
00548 url("/linkify", LinkifyHandler),
00549 url("/uimodule_resources", UIModuleResourceHandler),
00550 url("/optional_path/(.+)?", OptionalPathHandler),
00551 url("/multi_header", MultiHeaderHandler),
00552 url("/redirect", RedirectHandler),
00553 url("/header_injection", HeaderInjectionHandler),
00554 url("/get_argument", GetArgumentHandler),
00555 url("/get_arguments", GetArgumentsHandler),
00556 ]
00557 return urls
00558
00559 def fetch_json(self, *args, **kwargs):
00560 response = self.fetch(*args, **kwargs)
00561 response.rethrow()
00562 return json_decode(response.body)
00563
00564 def test_types(self):
00565 cookie_value = to_unicode(create_signed_value(self.COOKIE_SECRET,
00566 "asdf", "qwer"))
00567 response = self.fetch("/typecheck/asdf?foo=bar",
00568 headers={"Cookie": "asdf=" + cookie_value})
00569 data = json_decode(response.body)
00570 self.assertEqual(data, {})
00571
00572 response = self.fetch("/typecheck/asdf?foo=bar", method="POST",
00573 headers={"Cookie": "asdf=" + cookie_value},
00574 body="foo=bar")
00575
00576 def test_decode_argument(self):
00577
00578 urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
00579 "/decode_arg/%E9?foo=%E9&encoding=latin1",
00580 "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
00581 ]
00582 for req_url in urls:
00583 response = self.fetch(req_url)
00584 response.rethrow()
00585 data = json_decode(response.body)
00586 self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
00587 u('query'): [u('unicode'), u('\u00e9')],
00588 })
00589
00590 response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
00591 response.rethrow()
00592 data = json_decode(response.body)
00593 self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
00594 u('query'): [u('bytes'), u('c3a9')],
00595 })
00596
00597 def test_decode_argument_invalid_unicode(self):
00598
00599 with ExpectLog(gen_log, ".*Invalid unicode.*"):
00600 response = self.fetch("/typecheck/invalid%FF")
00601 self.assertEqual(response.code, 400)
00602 response = self.fetch("/typecheck/invalid?foo=%FF")
00603 self.assertEqual(response.code, 400)
00604
00605 def test_decode_argument_plus(self):
00606
00607 urls = ["/decode_arg/1%20%2B%201?foo=1%20%2B%201&encoding=utf-8",
00608 "/decode_arg/1%20+%201?foo=1+%2B+1&encoding=utf-8"]
00609 for req_url in urls:
00610 response = self.fetch(req_url)
00611 response.rethrow()
00612 data = json_decode(response.body)
00613 self.assertEqual(data, {u('path'): [u('unicode'), u('1 + 1')],
00614 u('query'): [u('unicode'), u('1 + 1')],
00615 })
00616
00617 def test_reverse_url(self):
00618 self.assertEqual(self.app.reverse_url('decode_arg', 'foo'),
00619 '/decode_arg/foo')
00620 self.assertEqual(self.app.reverse_url('decode_arg', 42),
00621 '/decode_arg/42')
00622 self.assertEqual(self.app.reverse_url('decode_arg', b'\xe9'),
00623 '/decode_arg/%E9')
00624 self.assertEqual(self.app.reverse_url('decode_arg', u('\u00e9')),
00625 '/decode_arg/%C3%A9')
00626 self.assertEqual(self.app.reverse_url('decode_arg', '1 + 1'),
00627 '/decode_arg/1%20%2B%201')
00628
00629 def test_uimodule_unescaped(self):
00630 response = self.fetch("/linkify")
00631 self.assertEqual(response.body,
00632 b"<a href=\"http://example.com\">http://example.com</a>")
00633
00634 def test_uimodule_resources(self):
00635 response = self.fetch("/uimodule_resources")
00636 self.assertEqual(response.body, b"""\
00637 <html><head><link href="/base.css" type="text/css" rel="stylesheet"/><link href="/foo.css" type="text/css" rel="stylesheet"/>
00638 <style type="text/css">
00639 .entry { margin-bottom: 1em; }
00640 </style>
00641 <meta>
00642 </head><body>
00643
00644
00645 <div class="entry">...</div>
00646
00647
00648 <div class="entry">...</div>
00649
00650 <script src="/common.js" type="text/javascript"></script>
00651 <script type="text/javascript">
00652 //<![CDATA[
00653 js_embed()
00654 //]]>
00655 </script>
00656 <script src="/analytics.js"/>
00657 </body></html>""")
00658
00659 def test_optional_path(self):
00660 self.assertEqual(self.fetch_json("/optional_path/foo"),
00661 {u("path"): u("foo")})
00662 self.assertEqual(self.fetch_json("/optional_path/"),
00663 {u("path"): None})
00664
00665 def test_multi_header(self):
00666 response = self.fetch("/multi_header")
00667 self.assertEqual(response.headers["x-overwrite"], "2")
00668 self.assertEqual(response.headers.get_list("x-multi"), ["3", "4"])
00669
00670 def test_redirect(self):
00671 response = self.fetch("/redirect?permanent=1", follow_redirects=False)
00672 self.assertEqual(response.code, 301)
00673 response = self.fetch("/redirect?permanent=0", follow_redirects=False)
00674 self.assertEqual(response.code, 302)
00675 response = self.fetch("/redirect?status=307", follow_redirects=False)
00676 self.assertEqual(response.code, 307)
00677
00678 def test_header_injection(self):
00679 response = self.fetch("/header_injection")
00680 self.assertEqual(response.body, b"ok")
00681
00682 def test_get_argument(self):
00683 response = self.fetch("/get_argument?foo=bar")
00684 self.assertEqual(response.body, b"bar")
00685 response = self.fetch("/get_argument?foo=")
00686 self.assertEqual(response.body, b"")
00687 response = self.fetch("/get_argument")
00688 self.assertEqual(response.body, b"default")
00689
00690
00691
00692 body = urllib_parse.urlencode(dict(foo="hello"))
00693 response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
00694 self.assertEqual(response.body, b"hello")
00695
00696 response = self.fetch("/get_arguments?foo=bar",
00697 method="POST", body=body)
00698 self.assertEqual(json_decode(response.body),
00699 dict(default=['bar', 'hello'],
00700 query=['bar'],
00701 body=['hello']))
00702
00703 def test_get_query_arguments(self):
00704
00705
00706 body = urllib_parse.urlencode(dict(foo="hello"))
00707 response = self.fetch("/get_argument?source=query&foo=bar",
00708 method="POST", body=body)
00709 self.assertEqual(response.body, b"bar")
00710 response = self.fetch("/get_argument?source=query&foo=",
00711 method="POST", body=body)
00712 self.assertEqual(response.body, b"")
00713 response = self.fetch("/get_argument?source=query",
00714 method="POST", body=body)
00715 self.assertEqual(response.body, b"default")
00716
00717 def test_get_body_arguments(self):
00718 body = urllib_parse.urlencode(dict(foo="bar"))
00719 response = self.fetch("/get_argument?source=body&foo=hello",
00720 method="POST", body=body)
00721 self.assertEqual(response.body, b"bar")
00722
00723 body = urllib_parse.urlencode(dict(foo=""))
00724 response = self.fetch("/get_argument?source=body&foo=hello",
00725 method="POST", body=body)
00726 self.assertEqual(response.body, b"")
00727
00728 body = urllib_parse.urlencode(dict())
00729 response = self.fetch("/get_argument?source=body&foo=hello",
00730 method="POST", body=body)
00731 self.assertEqual(response.body, b"default")
00732
00733 def test_no_gzip(self):
00734 response = self.fetch('/get_argument')
00735 self.assertNotIn('Accept-Encoding', response.headers.get('Vary', ''))
00736 self.assertNotIn('gzip', response.headers.get('Content-Encoding', ''))
00737
00738
00739 class NonWSGIWebTests(WebTestCase):
00740 def get_handlers(self):
00741 return [("/flow_control", FlowControlHandler),
00742 ("/empty_flush", EmptyFlushCallbackHandler),
00743 ]
00744
00745 def test_flow_control(self):
00746 self.assertEqual(self.fetch("/flow_control").body, b"123")
00747
00748 def test_empty_flush(self):
00749 response = self.fetch("/empty_flush")
00750 self.assertEqual(response.body, b"ok")
00751
00752
00753 @wsgi_safe
00754 class ErrorResponseTest(WebTestCase):
00755 def get_handlers(self):
00756 class DefaultHandler(RequestHandler):
00757 def get(self):
00758 if self.get_argument("status", None):
00759 raise HTTPError(int(self.get_argument("status")))
00760 1 / 0
00761
00762 class WriteErrorHandler(RequestHandler):
00763 def get(self):
00764 if self.get_argument("status", None):
00765 self.send_error(int(self.get_argument("status")))
00766 else:
00767 1 / 0
00768
00769 def write_error(self, status_code, **kwargs):
00770 self.set_header("Content-Type", "text/plain")
00771 if "exc_info" in kwargs:
00772 self.write("Exception: %s" % kwargs["exc_info"][0].__name__)
00773 else:
00774 self.write("Status: %d" % status_code)
00775
00776 class FailedWriteErrorHandler(RequestHandler):
00777 def get(self):
00778 1 / 0
00779
00780 def write_error(self, status_code, **kwargs):
00781 raise Exception("exception in write_error")
00782
00783 return [url("/default", DefaultHandler),
00784 url("/write_error", WriteErrorHandler),
00785 url("/failed_write_error", FailedWriteErrorHandler),
00786 ]
00787
00788 def test_default(self):
00789 with ExpectLog(app_log, "Uncaught exception"):
00790 response = self.fetch("/default")
00791 self.assertEqual(response.code, 500)
00792 self.assertTrue(b"500: Internal Server Error" in response.body)
00793
00794 response = self.fetch("/default?status=503")
00795 self.assertEqual(response.code, 503)
00796 self.assertTrue(b"503: Service Unavailable" in response.body)
00797
00798 def test_write_error(self):
00799 with ExpectLog(app_log, "Uncaught exception"):
00800 response = self.fetch("/write_error")
00801 self.assertEqual(response.code, 500)
00802 self.assertEqual(b"Exception: ZeroDivisionError", response.body)
00803
00804 response = self.fetch("/write_error?status=503")
00805 self.assertEqual(response.code, 503)
00806 self.assertEqual(b"Status: 503", response.body)
00807
00808 def test_failed_write_error(self):
00809 with ExpectLog(app_log, "Uncaught exception"):
00810 response = self.fetch("/failed_write_error")
00811 self.assertEqual(response.code, 500)
00812 self.assertEqual(b"", response.body)
00813
00814
00815 @wsgi_safe
00816 class StaticFileTest(WebTestCase):
00817
00818
00819 robots_txt_hash = b"f71d20196d4caf35b6a670db8c70b03d"
00820 static_dir = os.path.join(os.path.dirname(__file__), 'static')
00821
00822 def get_handlers(self):
00823 class StaticUrlHandler(RequestHandler):
00824 def get(self, path):
00825 with_v = int(self.get_argument('include_version', 1))
00826 self.write(self.static_url(path, include_version=with_v))
00827
00828 class AbsoluteStaticUrlHandler(StaticUrlHandler):
00829 include_host = True
00830
00831 class OverrideStaticUrlHandler(RequestHandler):
00832 def get(self, path):
00833 do_include = bool(self.get_argument("include_host"))
00834 self.include_host = not do_include
00835
00836 regular_url = self.static_url(path)
00837 override_url = self.static_url(path, include_host=do_include)
00838 if override_url == regular_url:
00839 return self.write(str(False))
00840
00841 protocol = self.request.protocol + "://"
00842 protocol_length = len(protocol)
00843 check_regular = regular_url.find(protocol, 0, protocol_length)
00844 check_override = override_url.find(protocol, 0, protocol_length)
00845
00846 if do_include:
00847 result = (check_override == 0 and check_regular == -1)
00848 else:
00849 result = (check_override == -1 and check_regular == 0)
00850 self.write(str(result))
00851
00852 return [('/static_url/(.*)', StaticUrlHandler),
00853 ('/abs_static_url/(.*)', AbsoluteStaticUrlHandler),
00854 ('/override_static_url/(.*)', OverrideStaticUrlHandler)]
00855
00856 def get_app_kwargs(self):
00857 return dict(static_path=relpath('static'))
00858
00859 def test_static_files(self):
00860 response = self.fetch('/robots.txt')
00861 self.assertTrue(b"Disallow: /" in response.body)
00862
00863 response = self.fetch('/static/robots.txt')
00864 self.assertTrue(b"Disallow: /" in response.body)
00865
00866 def test_static_url(self):
00867 response = self.fetch("/static_url/robots.txt")
00868 self.assertEqual(response.body,
00869 b"/static/robots.txt?v=" + self.robots_txt_hash)
00870
00871 def test_absolute_static_url(self):
00872 response = self.fetch("/abs_static_url/robots.txt")
00873 self.assertEqual(response.body, (
00874 utf8(self.get_url("/")) +
00875 b"static/robots.txt?v=" +
00876 self.robots_txt_hash
00877 ))
00878
00879 def test_relative_version_exclusion(self):
00880 response = self.fetch("/static_url/robots.txt?include_version=0")
00881 self.assertEqual(response.body, b"/static/robots.txt")
00882
00883 def test_absolute_version_exclusion(self):
00884 response = self.fetch("/abs_static_url/robots.txt?include_version=0")
00885 self.assertEqual(response.body,
00886 utf8(self.get_url("/") + "static/robots.txt"))
00887
00888 def test_include_host_override(self):
00889 self._trigger_include_host_check(False)
00890 self._trigger_include_host_check(True)
00891
00892 def _trigger_include_host_check(self, include_host):
00893 path = "/override_static_url/robots.txt?include_host=%s"
00894 response = self.fetch(path % int(include_host))
00895 self.assertEqual(response.body, utf8(str(True)))
00896
00897 def get_and_head(self, *args, **kwargs):
00898 """Performs a GET and HEAD request and returns the GET response.
00899
00900 Fails if any ``Content-*`` headers returned by the two requests
00901 differ.
00902 """
00903 head_response = self.fetch(*args, method="HEAD", **kwargs)
00904 get_response = self.fetch(*args, method="GET", **kwargs)
00905 content_headers = set()
00906 for h in itertools.chain(head_response.headers, get_response.headers):
00907 if h.startswith('Content-'):
00908 content_headers.add(h)
00909 for h in content_headers:
00910 self.assertEqual(head_response.headers.get(h),
00911 get_response.headers.get(h),
00912 "%s differs between GET (%s) and HEAD (%s)" %
00913 (h, head_response.headers.get(h),
00914 get_response.headers.get(h)))
00915 return get_response
00916
00917 def test_static_304_if_modified_since(self):
00918 response1 = self.get_and_head("/static/robots.txt")
00919 response2 = self.get_and_head("/static/robots.txt", headers={
00920 'If-Modified-Since': response1.headers['Last-Modified']})
00921 self.assertEqual(response2.code, 304)
00922 self.assertTrue('Content-Length' not in response2.headers)
00923 self.assertTrue('Last-Modified' not in response2.headers)
00924
00925 def test_static_304_if_none_match(self):
00926 response1 = self.get_and_head("/static/robots.txt")
00927 response2 = self.get_and_head("/static/robots.txt", headers={
00928 'If-None-Match': response1.headers['Etag']})
00929 self.assertEqual(response2.code, 304)
00930
00931 def test_static_if_modified_since_pre_epoch(self):
00932
00933
00934
00935 response = self.get_and_head("/static/robots.txt", headers={
00936 'If-Modified-Since': 'Fri, 01 Jan 1960 00:00:00 GMT'})
00937 self.assertEqual(response.code, 200)
00938
00939 def test_static_if_modified_since_time_zone(self):
00940
00941
00942
00943
00944 stat = os.stat(relpath('static/robots.txt'))
00945
00946 response = self.get_and_head('/static/robots.txt', headers={
00947 'If-Modified-Since': format_timestamp(stat.st_mtime - 1)})
00948 self.assertEqual(response.code, 200)
00949 response = self.get_and_head('/static/robots.txt', headers={
00950 'If-Modified-Since': format_timestamp(stat.st_mtime + 1)})
00951 self.assertEqual(response.code, 304)
00952
00953 def test_static_etag(self):
00954 response = self.get_and_head('/static/robots.txt')
00955 self.assertEqual(utf8(response.headers.get("Etag")),
00956 b'"' + self.robots_txt_hash + b'"')
00957
00958 def test_static_with_range(self):
00959 response = self.get_and_head('/static/robots.txt', headers={
00960 'Range': 'bytes=0-9'})
00961 self.assertEqual(response.code, 206)
00962 self.assertEqual(response.body, b"User-agent")
00963 self.assertEqual(utf8(response.headers.get("Etag")),
00964 b'"' + self.robots_txt_hash + b'"')
00965 self.assertEqual(response.headers.get("Content-Length"), "10")
00966 self.assertEqual(response.headers.get("Content-Range"),
00967 "bytes 0-9/26")
00968
00969 def test_static_with_range_full_file(self):
00970 response = self.get_and_head('/static/robots.txt', headers={
00971 'Range': 'bytes=0-'})
00972
00973
00974 self.assertEqual(response.code, 200)
00975 robots_file_path = os.path.join(self.static_dir, "robots.txt")
00976 with open(robots_file_path) as f:
00977 self.assertEqual(response.body, utf8(f.read()))
00978 self.assertEqual(response.headers.get("Content-Length"), "26")
00979 self.assertEqual(response.headers.get("Content-Range"), None)
00980
00981 def test_static_with_range_full_past_end(self):
00982 response = self.get_and_head('/static/robots.txt', headers={
00983 'Range': 'bytes=0-10000000'})
00984 self.assertEqual(response.code, 200)
00985 robots_file_path = os.path.join(self.static_dir, "robots.txt")
00986 with open(robots_file_path) as f:
00987 self.assertEqual(response.body, utf8(f.read()))
00988 self.assertEqual(response.headers.get("Content-Length"), "26")
00989 self.assertEqual(response.headers.get("Content-Range"), None)
00990
00991 def test_static_with_range_partial_past_end(self):
00992 response = self.get_and_head('/static/robots.txt', headers={
00993 'Range': 'bytes=1-10000000'})
00994 self.assertEqual(response.code, 206)
00995 robots_file_path = os.path.join(self.static_dir, "robots.txt")
00996 with open(robots_file_path) as f:
00997 self.assertEqual(response.body, utf8(f.read()[1:]))
00998 self.assertEqual(response.headers.get("Content-Length"), "25")
00999 self.assertEqual(response.headers.get("Content-Range"), "bytes 1-25/26")
01000
01001 def test_static_with_range_end_edge(self):
01002 response = self.get_and_head('/static/robots.txt', headers={
01003 'Range': 'bytes=22-'})
01004 self.assertEqual(response.body, b": /\n")
01005 self.assertEqual(response.headers.get("Content-Length"), "4")
01006 self.assertEqual(response.headers.get("Content-Range"),
01007 "bytes 22-25/26")
01008
01009 def test_static_with_range_neg_end(self):
01010 response = self.get_and_head('/static/robots.txt', headers={
01011 'Range': 'bytes=-4'})
01012 self.assertEqual(response.body, b": /\n")
01013 self.assertEqual(response.headers.get("Content-Length"), "4")
01014 self.assertEqual(response.headers.get("Content-Range"),
01015 "bytes 22-25/26")
01016
01017 def test_static_invalid_range(self):
01018 response = self.get_and_head('/static/robots.txt', headers={
01019 'Range': 'asdf'})
01020 self.assertEqual(response.code, 200)
01021
01022 def test_static_unsatisfiable_range_zero_suffix(self):
01023 response = self.get_and_head('/static/robots.txt', headers={
01024 'Range': 'bytes=-0'})
01025 self.assertEqual(response.headers.get("Content-Range"),
01026 "bytes */26")
01027 self.assertEqual(response.code, 416)
01028
01029 def test_static_unsatisfiable_range_invalid_start(self):
01030 response = self.get_and_head('/static/robots.txt', headers={
01031 'Range': 'bytes=26'})
01032 self.assertEqual(response.code, 416)
01033 self.assertEqual(response.headers.get("Content-Range"),
01034 "bytes */26")
01035
01036 def test_static_head(self):
01037 response = self.fetch('/static/robots.txt', method='HEAD')
01038 self.assertEqual(response.code, 200)
01039
01040 self.assertEqual(response.body, b'')
01041 self.assertEqual(response.headers['Content-Length'], '26')
01042 self.assertEqual(utf8(response.headers['Etag']),
01043 b'"' + self.robots_txt_hash + b'"')
01044
01045 def test_static_head_range(self):
01046 response = self.fetch('/static/robots.txt', method='HEAD',
01047 headers={'Range': 'bytes=1-4'})
01048 self.assertEqual(response.code, 206)
01049 self.assertEqual(response.body, b'')
01050 self.assertEqual(response.headers['Content-Length'], '4')
01051 self.assertEqual(utf8(response.headers['Etag']),
01052 b'"' + self.robots_txt_hash + b'"')
01053
01054 def test_static_range_if_none_match(self):
01055 response = self.get_and_head('/static/robots.txt', headers={
01056 'Range': 'bytes=1-4',
01057 'If-None-Match': b'"' + self.robots_txt_hash + b'"'})
01058 self.assertEqual(response.code, 304)
01059 self.assertEqual(response.body, b'')
01060 self.assertTrue('Content-Length' not in response.headers)
01061 self.assertEqual(utf8(response.headers['Etag']),
01062 b'"' + self.robots_txt_hash + b'"')
01063
01064 def test_static_404(self):
01065 response = self.get_and_head('/static/blarg')
01066 self.assertEqual(response.code, 404)
01067
01068
01069 @wsgi_safe
01070 class StaticDefaultFilenameTest(WebTestCase):
01071 def get_app_kwargs(self):
01072 return dict(static_path=relpath('static'),
01073 static_handler_args=dict(default_filename='index.html'))
01074
01075 def get_handlers(self):
01076 return []
01077
01078 def test_static_default_filename(self):
01079 response = self.fetch('/static/dir/', follow_redirects=False)
01080 self.assertEqual(response.code, 200)
01081 self.assertEqual(b'this is the index\n', response.body)
01082
01083 def test_static_default_redirect(self):
01084 response = self.fetch('/static/dir', follow_redirects=False)
01085 self.assertEqual(response.code, 301)
01086 self.assertTrue(response.headers['Location'].endswith('/static/dir/'))
01087
01088
01089 @wsgi_safe
01090 class StaticFileWithPathTest(WebTestCase):
01091 def get_app_kwargs(self):
01092 return dict(static_path=relpath('static'),
01093 static_handler_args=dict(default_filename='index.html'))
01094
01095 def get_handlers(self):
01096 return [("/foo/(.*)", StaticFileHandler, {
01097 "path": relpath("templates/"),
01098 })]
01099
01100 def test_serve(self):
01101 response = self.fetch("/foo/utf8.html")
01102 self.assertEqual(response.body, b"H\xc3\xa9llo\n")
01103
01104
01105 @wsgi_safe
01106 class CustomStaticFileTest(WebTestCase):
01107 def get_handlers(self):
01108 class MyStaticFileHandler(StaticFileHandler):
01109 @classmethod
01110 def make_static_url(cls, settings, path):
01111 version_hash = cls.get_version(settings, path)
01112 extension_index = path.rindex('.')
01113 before_version = path[:extension_index]
01114 after_version = path[(extension_index + 1):]
01115 return '/static/%s.%s.%s' % (before_version, version_hash,
01116 after_version)
01117
01118 def parse_url_path(self, url_path):
01119 extension_index = url_path.rindex('.')
01120 version_index = url_path.rindex('.', 0, extension_index)
01121 return '%s%s' % (url_path[:version_index],
01122 url_path[extension_index:])
01123
01124 @classmethod
01125 def get_absolute_path(cls, settings, path):
01126 return 'CustomStaticFileTest:' + path
01127
01128 def validate_absolute_path(self, root, absolute_path):
01129 return absolute_path
01130
01131 @classmethod
01132 def get_content(self, path, start=None, end=None):
01133 assert start is None and end is None
01134 if path == 'CustomStaticFileTest:foo.txt':
01135 return b'bar'
01136 raise Exception("unexpected path %r" % path)
01137
01138 def get_content_size(self):
01139 if self.absolute_path == 'CustomStaticFileTest:foo.txt':
01140 return 3
01141 raise Exception("unexpected path %r" % self.absolute_path)
01142
01143 def get_modified_time(self):
01144 return None
01145
01146 @classmethod
01147 def get_version(cls, settings, path):
01148 return "42"
01149
01150 class StaticUrlHandler(RequestHandler):
01151 def get(self, path):
01152 self.write(self.static_url(path))
01153
01154 self.static_handler_class = MyStaticFileHandler
01155
01156 return [("/static_url/(.*)", StaticUrlHandler)]
01157
01158 def get_app_kwargs(self):
01159 return dict(static_path="dummy",
01160 static_handler_class=self.static_handler_class)
01161
01162 def test_serve(self):
01163 response = self.fetch("/static/foo.42.txt")
01164 self.assertEqual(response.body, b"bar")
01165
01166 def test_static_url(self):
01167 with ExpectLog(gen_log, "Could not open static file", required=False):
01168 response = self.fetch("/static_url/foo.txt")
01169 self.assertEqual(response.body, b"/static/foo.42.txt")
01170
01171
01172 @wsgi_safe
01173 class HostMatchingTest(WebTestCase):
01174 class Handler(RequestHandler):
01175 def initialize(self, reply):
01176 self.reply = reply
01177
01178 def get(self):
01179 self.write(self.reply)
01180
01181 def get_handlers(self):
01182 return [("/foo", HostMatchingTest.Handler, {"reply": "wildcard"})]
01183
01184 def test_host_matching(self):
01185 self.app.add_handlers("www.example.com",
01186 [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})])
01187 self.app.add_handlers(r"www\.example\.com",
01188 [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})])
01189 self.app.add_handlers("www.example.com",
01190 [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})])
01191
01192 response = self.fetch("/foo")
01193 self.assertEqual(response.body, b"wildcard")
01194 response = self.fetch("/bar")
01195 self.assertEqual(response.code, 404)
01196 response = self.fetch("/baz")
01197 self.assertEqual(response.code, 404)
01198
01199 response = self.fetch("/foo", headers={'Host': 'www.example.com'})
01200 self.assertEqual(response.body, b"[0]")
01201 response = self.fetch("/bar", headers={'Host': 'www.example.com'})
01202 self.assertEqual(response.body, b"[1]")
01203 response = self.fetch("/baz", headers={'Host': 'www.example.com'})
01204 self.assertEqual(response.body, b"[2]")
01205
01206
01207 @wsgi_safe
01208 class NamedURLSpecGroupsTest(WebTestCase):
01209 def get_handlers(self):
01210 class EchoHandler(RequestHandler):
01211 def get(self, path):
01212 self.write(path)
01213
01214 return [("/str/(?P<path>.*)", EchoHandler),
01215 (u("/unicode/(?P<path>.*)"), EchoHandler)]
01216
01217 def test_named_urlspec_groups(self):
01218 response = self.fetch("/str/foo")
01219 self.assertEqual(response.body, b"foo")
01220
01221 response = self.fetch("/unicode/bar")
01222 self.assertEqual(response.body, b"bar")
01223
01224
01225 @wsgi_safe
01226 class ClearHeaderTest(SimpleHandlerTestCase):
01227 class Handler(RequestHandler):
01228 def get(self):
01229 self.set_header("h1", "foo")
01230 self.set_header("h2", "bar")
01231 self.clear_header("h1")
01232 self.clear_header("nonexistent")
01233
01234 def test_clear_header(self):
01235 response = self.fetch("/")
01236 self.assertTrue("h1" not in response.headers)
01237 self.assertEqual(response.headers["h2"], "bar")
01238
01239
01240 @wsgi_safe
01241 class Header304Test(SimpleHandlerTestCase):
01242 class Handler(RequestHandler):
01243 def get(self):
01244 self.set_header("Content-Language", "en_US")
01245 self.write("hello")
01246
01247 def test_304_headers(self):
01248 response1 = self.fetch('/')
01249 self.assertEqual(response1.headers["Content-Length"], "5")
01250 self.assertEqual(response1.headers["Content-Language"], "en_US")
01251
01252 response2 = self.fetch('/', headers={
01253 'If-None-Match': response1.headers["Etag"]})
01254 self.assertEqual(response2.code, 304)
01255 self.assertTrue("Content-Length" not in response2.headers)
01256 self.assertTrue("Content-Language" not in response2.headers)
01257
01258 self.assertTrue("Transfer-Encoding" not in response2.headers)
01259
01260
01261 @wsgi_safe
01262 class StatusReasonTest(SimpleHandlerTestCase):
01263 class Handler(RequestHandler):
01264 def get(self):
01265 reason = self.request.arguments.get('reason', [])
01266 self.set_status(int(self.get_argument('code')),
01267 reason=reason[0] if reason else None)
01268
01269 def get_http_client(self):
01270
01271 return SimpleAsyncHTTPClient(io_loop=self.io_loop)
01272
01273 def test_status(self):
01274 response = self.fetch("/?code=304")
01275 self.assertEqual(response.code, 304)
01276 self.assertEqual(response.reason, "Not Modified")
01277 response = self.fetch("/?code=304&reason=Foo")
01278 self.assertEqual(response.code, 304)
01279 self.assertEqual(response.reason, "Foo")
01280 response = self.fetch("/?code=682&reason=Bar")
01281 self.assertEqual(response.code, 682)
01282 self.assertEqual(response.reason, "Bar")
01283 with ExpectLog(app_log, 'Uncaught exception'):
01284 response = self.fetch("/?code=682")
01285 self.assertEqual(response.code, 500)
01286
01287
01288 @wsgi_safe
01289 class DateHeaderTest(SimpleHandlerTestCase):
01290 class Handler(RequestHandler):
01291 def get(self):
01292 self.write("hello")
01293
01294 def test_date_header(self):
01295 response = self.fetch('/')
01296 header_date = datetime.datetime(
01297 *email.utils.parsedate(response.headers['Date'])[:6])
01298 self.assertTrue(header_date - datetime.datetime.utcnow() <
01299 datetime.timedelta(seconds=2))
01300
01301
01302 @wsgi_safe
01303 class RaiseWithReasonTest(SimpleHandlerTestCase):
01304 class Handler(RequestHandler):
01305 def get(self):
01306 raise HTTPError(682, reason="Foo")
01307
01308 def get_http_client(self):
01309
01310 return SimpleAsyncHTTPClient(io_loop=self.io_loop)
01311
01312 def test_raise_with_reason(self):
01313 response = self.fetch("/")
01314 self.assertEqual(response.code, 682)
01315 self.assertEqual(response.reason, "Foo")
01316 self.assertIn(b'682: Foo', response.body)
01317
01318 def test_httperror_str(self):
01319 self.assertEqual(str(HTTPError(682, reason="Foo")), "HTTP 682: Foo")
01320
01321
01322 @wsgi_safe
01323 class ErrorHandlerXSRFTest(WebTestCase):
01324 def get_handlers(self):
01325
01326
01327
01328 return [('/error', ErrorHandler, dict(status_code=417))]
01329
01330 def get_app_kwargs(self):
01331 return dict(xsrf_cookies=True)
01332
01333 def test_error_xsrf(self):
01334 response = self.fetch('/error', method='POST', body='')
01335 self.assertEqual(response.code, 417)
01336
01337 def test_404_xsrf(self):
01338 response = self.fetch('/404', method='POST', body='')
01339 self.assertEqual(response.code, 404)
01340
01341
01342 @wsgi_safe
01343 class GzipTestCase(SimpleHandlerTestCase):
01344 class Handler(RequestHandler):
01345 def get(self):
01346 if self.get_argument('vary', None):
01347 self.set_header('Vary', self.get_argument('vary'))
01348 self.write('hello world')
01349
01350 def get_app_kwargs(self):
01351 return dict(
01352 gzip=True,
01353 static_path=os.path.join(os.path.dirname(__file__), 'static'))
01354
01355 def test_gzip(self):
01356 response = self.fetch('/')
01357
01358
01359 self.assertEqual(
01360 response.headers.get(
01361 'Content-Encoding',
01362 response.headers.get('X-Consumed-Content-Encoding')),
01363 'gzip')
01364 self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
01365
01366 def test_gzip_static(self):
01367
01368
01369 response = self.fetch('/robots.txt')
01370 self.assertEqual(
01371 response.headers.get(
01372 'Content-Encoding',
01373 response.headers.get('X-Consumed-Content-Encoding')),
01374 'gzip')
01375 self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
01376
01377 def test_gzip_not_requested(self):
01378 response = self.fetch('/', use_gzip=False)
01379 self.assertNotIn('Content-Encoding', response.headers)
01380 self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
01381
01382 def test_vary_already_present(self):
01383 response = self.fetch('/?vary=Accept-Language')
01384 self.assertEqual(response.headers['Vary'],
01385 'Accept-Language, Accept-Encoding')
01386
01387
01388 @wsgi_safe
01389 class PathArgsInPrepareTest(WebTestCase):
01390 class Handler(RequestHandler):
01391 def prepare(self):
01392 self.write(dict(args=self.path_args, kwargs=self.path_kwargs))
01393
01394 def get(self, path):
01395 assert path == 'foo'
01396 self.finish()
01397
01398 def get_handlers(self):
01399 return [('/pos/(.*)', self.Handler),
01400 ('/kw/(?P<path>.*)', self.Handler)]
01401
01402 def test_pos(self):
01403 response = self.fetch('/pos/foo')
01404 response.rethrow()
01405 data = json_decode(response.body)
01406 self.assertEqual(data, {'args': ['foo'], 'kwargs': {}})
01407
01408 def test_kw(self):
01409 response = self.fetch('/kw/foo')
01410 response.rethrow()
01411 data = json_decode(response.body)
01412 self.assertEqual(data, {'args': [], 'kwargs': {'path': 'foo'}})
01413
01414
01415 @wsgi_safe
01416 class ClearAllCookiesTest(SimpleHandlerTestCase):
01417 class Handler(RequestHandler):
01418 def get(self):
01419 self.clear_all_cookies()
01420 self.write('ok')
01421
01422 def test_clear_all_cookies(self):
01423 response = self.fetch('/', headers={'Cookie': 'foo=bar; baz=xyzzy'})
01424 set_cookies = sorted(response.headers.get_list('Set-Cookie'))
01425 self.assertTrue(set_cookies[0].startswith('baz=;'))
01426 self.assertTrue(set_cookies[1].startswith('foo=;'))
01427
01428
01429 class PermissionError(Exception):
01430 pass
01431
01432
01433 @wsgi_safe
01434 class ExceptionHandlerTest(SimpleHandlerTestCase):
01435 class Handler(RequestHandler):
01436 def get(self):
01437 exc = self.get_argument('exc')
01438 if exc == 'http':
01439 raise HTTPError(410, "no longer here")
01440 elif exc == 'zero':
01441 1 / 0
01442 elif exc == 'permission':
01443 raise PermissionError('not allowed')
01444
01445 def write_error(self, status_code, **kwargs):
01446 if 'exc_info' in kwargs:
01447 typ, value, tb = kwargs['exc_info']
01448 if isinstance(value, PermissionError):
01449 self.set_status(403)
01450 self.write('PermissionError')
01451 return
01452 RequestHandler.write_error(self, status_code, **kwargs)
01453
01454 def log_exception(self, typ, value, tb):
01455 if isinstance(value, PermissionError):
01456 app_log.warning('custom logging for PermissionError: %s',
01457 value.args[0])
01458 else:
01459 RequestHandler.log_exception(self, typ, value, tb)
01460
01461 def test_http_error(self):
01462
01463
01464 with ExpectLog(gen_log, '.*no longer here'):
01465 response = self.fetch('/?exc=http')
01466 self.assertEqual(response.code, 410)
01467
01468 def test_unknown_error(self):
01469
01470 with ExpectLog(app_log, 'Uncaught exception'):
01471 response = self.fetch('/?exc=zero')
01472 self.assertEqual(response.code, 500)
01473
01474 def test_known_error(self):
01475
01476
01477 with ExpectLog(app_log,
01478 'custom logging for PermissionError: not allowed'):
01479 response = self.fetch('/?exc=permission')
01480 self.assertEqual(response.code, 403)
01481
01482
01483 @wsgi_safe
01484 class UIMethodUIModuleTest(SimpleHandlerTestCase):
01485 """Test that UI methods and modules are created correctly and
01486 associated with the handler.
01487 """
01488 class Handler(RequestHandler):
01489 def get(self):
01490 self.render('foo.html')
01491
01492 def value(self):
01493 return self.get_argument("value")
01494
01495 def get_app_kwargs(self):
01496 def my_ui_method(handler, x):
01497 return "In my_ui_method(%s) with handler value %s." % (
01498 x, handler.value())
01499 class MyModule(UIModule):
01500 def render(self, x):
01501 return "In MyModule(%s) with handler value %s." % (
01502 x, self.handler.value())
01503
01504 loader = DictLoader({
01505 'foo.html': '{{ my_ui_method(42) }} {% module MyModule(123) %}',
01506 })
01507 return dict(template_loader=loader,
01508 ui_methods={'my_ui_method': my_ui_method},
01509 ui_modules={'MyModule': MyModule})
01510
01511 def tearDown(self):
01512 super(UIMethodUIModuleTest, self).tearDown()
01513
01514 RequestHandler._template_loaders.clear()
01515
01516 def test_ui_method(self):
01517 response = self.fetch('/?value=asdf')
01518 self.assertEqual(response.body,
01519 b'In my_ui_method(42) with handler value asdf. '
01520 b'In MyModule(123) with handler value asdf.')
01521
01522
01523 @wsgi_safe
01524 class GetArgumentErrorTest(SimpleHandlerTestCase):
01525 class Handler(RequestHandler):
01526 def get(self):
01527 try:
01528 self.get_argument('foo')
01529 self.write({})
01530 except MissingArgumentError as e:
01531 self.write({'arg_name': e.arg_name,
01532 'log_message': e.log_message})
01533
01534 def test_catch_error(self):
01535 response = self.fetch('/')
01536 self.assertEqual(json_decode(response.body),
01537 {'arg_name': 'foo',
01538 'log_message': 'Missing argument foo'})
01539
01540
01541 class MultipleExceptionTest(SimpleHandlerTestCase):
01542 class Handler(RequestHandler):
01543 exc_count = 0
01544
01545 @asynchronous
01546 def get(self):
01547 from tornado.ioloop import IOLoop
01548 IOLoop.current().add_callback(lambda: 1 / 0)
01549 IOLoop.current().add_callback(lambda: 1 / 0)
01550
01551 def log_exception(self, typ, value, tb):
01552 MultipleExceptionTest.Handler.exc_count += 1
01553
01554 def test_multi_exception(self):
01555
01556
01557
01558
01559 response = self.fetch('/')
01560 self.assertEqual(response.code, 500)
01561 response = self.fetch('/')
01562 self.assertEqual(response.code, 500)
01563
01564
01565
01566 self.assertGreater(MultipleExceptionTest.Handler.exc_count, 2)
01567
01568
01569 @wsgi_safe
01570 class SetCurrentUserTest(SimpleHandlerTestCase):
01571 class Handler(RequestHandler):
01572 def prepare(self):
01573 self.current_user = 'Ben'
01574
01575 def get(self):
01576 self.write('Hello %s' % self.current_user)
01577
01578 def test_set_current_user(self):
01579
01580
01581 response = self.fetch('/')
01582 self.assertEqual(response.body, b'Hello Ben')
01583
01584
01585 @wsgi_safe
01586 class GetCurrentUserTest(WebTestCase):
01587 def get_app_kwargs(self):
01588 class WithoutUserModule(UIModule):
01589 def render(self):
01590 return ''
01591
01592 class WithUserModule(UIModule):
01593 def render(self):
01594 return str(self.current_user)
01595
01596 loader = DictLoader({
01597 'without_user.html': '',
01598 'with_user.html': '{{ current_user }}',
01599 'without_user_module.html': '{% module WithoutUserModule() %}',
01600 'with_user_module.html': '{% module WithUserModule() %}',
01601 })
01602 return dict(template_loader=loader,
01603 ui_modules={'WithUserModule': WithUserModule,
01604 'WithoutUserModule': WithoutUserModule})
01605
01606 def tearDown(self):
01607 super(GetCurrentUserTest, self).tearDown()
01608 RequestHandler._template_loaders.clear()
01609
01610 def get_handlers(self):
01611 class CurrentUserHandler(RequestHandler):
01612 def prepare(self):
01613 self.has_loaded_current_user = False
01614
01615 def get_current_user(self):
01616 self.has_loaded_current_user = True
01617 return ''
01618
01619 class WithoutUserHandler(CurrentUserHandler):
01620 def get(self):
01621 self.render_string('without_user.html')
01622 self.finish(str(self.has_loaded_current_user))
01623
01624 class WithUserHandler(CurrentUserHandler):
01625 def get(self):
01626 self.render_string('with_user.html')
01627 self.finish(str(self.has_loaded_current_user))
01628
01629 class CurrentUserModuleHandler(CurrentUserHandler):
01630 def get_template_namespace(self):
01631
01632
01633
01634 return self.ui
01635
01636 class WithoutUserModuleHandler(CurrentUserModuleHandler):
01637 def get(self):
01638 self.render_string('without_user_module.html')
01639 self.finish(str(self.has_loaded_current_user))
01640
01641 class WithUserModuleHandler(CurrentUserModuleHandler):
01642 def get(self):
01643 self.render_string('with_user_module.html')
01644 self.finish(str(self.has_loaded_current_user))
01645
01646 return [('/without_user', WithoutUserHandler),
01647 ('/with_user', WithUserHandler),
01648 ('/without_user_module', WithoutUserModuleHandler),
01649 ('/with_user_module', WithUserModuleHandler)]
01650
01651 @unittest.skip('needs fix')
01652 def test_get_current_user_is_lazy(self):
01653
01654 response = self.fetch('/without_user')
01655 self.assertEqual(response.body, b'False')
01656
01657 def test_get_current_user_works(self):
01658 response = self.fetch('/with_user')
01659 self.assertEqual(response.body, b'True')
01660
01661 def test_get_current_user_from_ui_module_is_lazy(self):
01662 response = self.fetch('/without_user_module')
01663 self.assertEqual(response.body, b'False')
01664
01665 def test_get_current_user_from_ui_module_works(self):
01666 response = self.fetch('/with_user_module')
01667 self.assertEqual(response.body, b'True')
01668
01669
01670 @wsgi_safe
01671 class UnimplementedHTTPMethodsTest(SimpleHandlerTestCase):
01672 class Handler(RequestHandler):
01673 pass
01674
01675 def test_unimplemented_standard_methods(self):
01676 for method in ['HEAD', 'GET', 'DELETE', 'OPTIONS']:
01677 response = self.fetch('/', method=method)
01678 self.assertEqual(response.code, 405)
01679 for method in ['POST', 'PUT']:
01680 response = self.fetch('/', method=method, body=b'')
01681 self.assertEqual(response.code, 405)
01682
01683
01684 class UnimplementedNonStandardMethodsTest(SimpleHandlerTestCase):
01685
01686
01687 class Handler(RequestHandler):
01688 def other(self):
01689
01690
01691 self.write('other')
01692
01693 def test_unimplemented_patch(self):
01694
01695
01696 response = self.fetch('/', method='PATCH', body=b'')
01697 self.assertEqual(response.code, 405)
01698
01699 def test_unimplemented_other(self):
01700 response = self.fetch('/', method='OTHER',
01701 allow_nonstandard_methods=True)
01702 self.assertEqual(response.code, 405)
01703
01704
01705 @wsgi_safe
01706 class AllHTTPMethodsTest(SimpleHandlerTestCase):
01707 class Handler(RequestHandler):
01708 def method(self):
01709 self.write(self.request.method)
01710
01711 get = delete = options = post = put = method
01712
01713 def test_standard_methods(self):
01714 response = self.fetch('/', method='HEAD')
01715 self.assertEqual(response.body, b'')
01716 for method in ['GET', 'DELETE', 'OPTIONS']:
01717 response = self.fetch('/', method=method)
01718 self.assertEqual(response.body, utf8(method))
01719 for method in ['POST', 'PUT']:
01720 response = self.fetch('/', method=method, body=b'')
01721 self.assertEqual(response.body, utf8(method))
01722
01723
01724 class PatchMethodTest(SimpleHandlerTestCase):
01725 class Handler(RequestHandler):
01726 SUPPORTED_METHODS = RequestHandler.SUPPORTED_METHODS + ('OTHER',)
01727
01728 def patch(self):
01729 self.write('patch')
01730
01731 def other(self):
01732 self.write('other')
01733
01734 def test_patch(self):
01735 response = self.fetch('/', method='PATCH', body=b'')
01736 self.assertEqual(response.body, b'patch')
01737
01738 def test_other(self):
01739 response = self.fetch('/', method='OTHER',
01740 allow_nonstandard_methods=True)
01741 self.assertEqual(response.body, b'other')
01742
01743
01744 @wsgi_safe
01745 class FinishInPrepareTest(SimpleHandlerTestCase):
01746 class Handler(RequestHandler):
01747 def prepare(self):
01748 self.finish('done')
01749
01750 def get(self):
01751
01752
01753
01754 raise Exception('should not reach this method')
01755
01756 def test_finish_in_prepare(self):
01757 response = self.fetch('/')
01758 self.assertEqual(response.body, b'done')
01759
01760
01761 @wsgi_safe
01762 class Default404Test(WebTestCase):
01763 def get_handlers(self):
01764
01765 return [('/foo', RequestHandler)]
01766
01767 def test_404(self):
01768 response = self.fetch('/')
01769 self.assertEqual(response.code, 404)
01770 self.assertEqual(response.body,
01771 b'<html><title>404: Not Found</title>'
01772 b'<body>404: Not Found</body></html>')
01773
01774
01775 @wsgi_safe
01776 class Custom404Test(WebTestCase):
01777 def get_handlers(self):
01778 return [('/foo', RequestHandler)]
01779
01780 def get_app_kwargs(self):
01781 class Custom404Handler(RequestHandler):
01782 def get(self):
01783 self.set_status(404)
01784 self.write('custom 404 response')
01785
01786 return dict(default_handler_class=Custom404Handler)
01787
01788 def test_404(self):
01789 response = self.fetch('/')
01790 self.assertEqual(response.code, 404)
01791 self.assertEqual(response.body, b'custom 404 response')
01792
01793
01794 @wsgi_safe
01795 class DefaultHandlerArgumentsTest(WebTestCase):
01796 def get_handlers(self):
01797 return [('/foo', RequestHandler)]
01798
01799 def get_app_kwargs(self):
01800 return dict(default_handler_class=ErrorHandler,
01801 default_handler_args=dict(status_code=403))
01802
01803 def test_403(self):
01804 response = self.fetch('/')
01805 self.assertEqual(response.code, 403)
01806
01807
01808 @wsgi_safe
01809 class HandlerByNameTest(WebTestCase):
01810 def get_handlers(self):
01811
01812 return [('/hello1', HelloHandler),
01813 ('/hello2', 'tornado.test.web_test.HelloHandler'),
01814 url('/hello3', 'tornado.test.web_test.HelloHandler'),
01815 ]
01816
01817 def test_handler_by_name(self):
01818 resp = self.fetch('/hello1')
01819 self.assertEqual(resp.body, b'hello')
01820 resp = self.fetch('/hello2')
01821 self.assertEqual(resp.body, b'hello')
01822 resp = self.fetch('/hello3')
01823 self.assertEqual(resp.body, b'hello')
01824
01825
01826 class StreamingRequestBodyTest(WebTestCase):
01827 def get_handlers(self):
01828 @stream_request_body
01829 class StreamingBodyHandler(RequestHandler):
01830 def initialize(self, test):
01831 self.test = test
01832
01833 def prepare(self):
01834 self.test.prepared.set_result(None)
01835
01836 def data_received(self, data):
01837 self.test.data.set_result(data)
01838
01839 def get(self):
01840 self.test.finished.set_result(None)
01841 self.write({})
01842
01843 @stream_request_body
01844 class EarlyReturnHandler(RequestHandler):
01845 def prepare(self):
01846
01847
01848 raise HTTPError(401)
01849
01850 @stream_request_body
01851 class CloseDetectionHandler(RequestHandler):
01852 def initialize(self, test):
01853 self.test = test
01854
01855 def on_connection_close(self):
01856 super(CloseDetectionHandler, self).on_connection_close()
01857 self.test.close_future.set_result(None)
01858
01859 return [('/stream_body', StreamingBodyHandler, dict(test=self)),
01860 ('/early_return', EarlyReturnHandler),
01861 ('/close_detection', CloseDetectionHandler, dict(test=self))]
01862
01863 def connect(self, url, connection_close):
01864
01865 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
01866 s.connect(("localhost", self.get_http_port()))
01867 stream = IOStream(s, io_loop=self.io_loop)
01868 stream.write(b"GET " + url + b" HTTP/1.1\r\n")
01869 if connection_close:
01870 stream.write(b"Connection: close\r\n")
01871 stream.write(b"Transfer-Encoding: chunked\r\n\r\n")
01872 return stream
01873
01874 @gen_test
01875 def test_streaming_body(self):
01876 self.prepared = Future()
01877 self.data = Future()
01878 self.finished = Future()
01879
01880 stream = self.connect(b"/stream_body", connection_close=True)
01881 yield self.prepared
01882 stream.write(b"4\r\nasdf\r\n")
01883
01884 data = yield self.data
01885 self.assertEqual(data, b"asdf")
01886 self.data = Future()
01887 stream.write(b"4\r\nqwer\r\n")
01888 data = yield self.data
01889 self.assertEquals(data, b"qwer")
01890 stream.write(b"0\r\n")
01891 yield self.finished
01892 data = yield gen.Task(stream.read_until_close)
01893
01894 self.assertTrue(data.endswith(b"{}"))
01895 stream.close()
01896
01897 @gen_test
01898 def test_early_return(self):
01899 stream = self.connect(b"/early_return", connection_close=False)
01900 data = yield gen.Task(stream.read_until_close)
01901 self.assertTrue(data.startswith(b"HTTP/1.1 401"))
01902
01903 @gen_test
01904 def test_early_return_with_data(self):
01905 stream = self.connect(b"/early_return", connection_close=False)
01906 stream.write(b"4\r\nasdf\r\n")
01907 data = yield gen.Task(stream.read_until_close)
01908 self.assertTrue(data.startswith(b"HTTP/1.1 401"))
01909
01910 @gen_test
01911 def test_close_during_upload(self):
01912 self.close_future = Future()
01913 stream = self.connect(b"/close_detection", connection_close=False)
01914 stream.close()
01915 yield self.close_future
01916
01917
01918 class StreamingRequestFlowControlTest(WebTestCase):
01919 def get_handlers(self):
01920 from tornado.ioloop import IOLoop
01921
01922
01923
01924
01925
01926 @stream_request_body
01927 class FlowControlHandler(RequestHandler):
01928 def initialize(self, test):
01929 self.test = test
01930 self.method = None
01931 self.methods = []
01932
01933 @contextlib.contextmanager
01934 def in_method(self, method):
01935 if self.method is not None:
01936 self.test.fail("entered method %s while in %s" %
01937 (method, self.method))
01938 self.method = method
01939 self.methods.append(method)
01940 try:
01941 yield
01942 finally:
01943 self.method = None
01944
01945 @gen.coroutine
01946 def prepare(self):
01947 with self.in_method('prepare'):
01948 yield gen.Task(IOLoop.current().add_callback)
01949
01950 @gen.coroutine
01951 def data_received(self, data):
01952 with self.in_method('data_received'):
01953 yield gen.Task(IOLoop.current().add_callback)
01954
01955 @gen.coroutine
01956 def post(self):
01957 with self.in_method('post'):
01958 yield gen.Task(IOLoop.current().add_callback)
01959 self.write(dict(methods=self.methods))
01960
01961 return [('/', FlowControlHandler, dict(test=self))]
01962
01963 def get_httpserver_options(self):
01964
01965
01966 return dict(chunk_size=10)
01967
01968 def test_flow_control(self):
01969 response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz',
01970 method='POST')
01971 response.rethrow()
01972 self.assertEqual(json_decode(response.body),
01973 dict(methods=['prepare', 'data_received',
01974 'data_received', 'data_received',
01975 'post']))
01976
01977
01978 @wsgi_safe
01979 class IncorrectContentLengthTest(SimpleHandlerTestCase):
01980 def get_handlers(self):
01981 test = self
01982 self.server_error = None
01983
01984
01985 class TooHigh(RequestHandler):
01986 def get(self):
01987 self.set_header("Content-Length", "42")
01988 try:
01989 self.finish("ok")
01990 except Exception as e:
01991 test.server_error = e
01992 raise
01993
01994 class TooLow(RequestHandler):
01995 def get(self):
01996 self.set_header("Content-Length", "2")
01997 try:
01998 self.finish("hello")
01999 except Exception as e:
02000 test.server_error = e
02001 raise
02002
02003 return [('/high', TooHigh),
02004 ('/low', TooLow)]
02005
02006 def test_content_length_too_high(self):
02007
02008
02009
02010 with ExpectLog(app_log, "Uncaught exception"):
02011 with ExpectLog(gen_log,
02012 "Cannot send error response after headers written"):
02013 response = self.fetch("/high")
02014 self.assertEqual(response.code, 599)
02015 self.assertEqual(str(self.server_error),
02016 "Tried to write 40 bytes less than Content-Length")
02017
02018 def test_content_length_too_low(self):
02019
02020
02021
02022 with ExpectLog(app_log, "Uncaught exception"):
02023 with ExpectLog(gen_log,
02024 "Cannot send error response after headers written"):
02025 response = self.fetch("/low")
02026 self.assertEqual(response.code, 599)
02027 self.assertEqual(str(self.server_error),
02028 "Tried to write more data than Content-Length")
02029
02030
02031 class ClientCloseTest(SimpleHandlerTestCase):
02032 class Handler(RequestHandler):
02033 def get(self):
02034
02035
02036
02037
02038
02039 self.request.connection.stream.close()
02040 self.write('hello')
02041
02042 def test_client_close(self):
02043 response = self.fetch('/')
02044 self.assertEqual(response.code, 599)
02045
02046
02047 class SignedValueTest(unittest.TestCase):
02048 SECRET = "It's a secret to everybody"
02049
02050 def past(self):
02051 return self.present() - 86400 * 32
02052
02053 def present(self):
02054 return 1300000000
02055
02056 def test_known_values(self):
02057 signed_v1 = create_signed_value(SignedValueTest.SECRET, "key", "value",
02058 version=1, clock=self.present)
02059 self.assertEqual(
02060 signed_v1,
02061 b"dmFsdWU=|1300000000|31c934969f53e48164c50768b40cbd7e2daaaa4f")
02062
02063 signed_v2 = create_signed_value(SignedValueTest.SECRET, "key", "value",
02064 version=2, clock=self.present)
02065 self.assertEqual(
02066 signed_v2,
02067 b"2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
02068 b"3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152")
02069
02070 signed_default = create_signed_value(SignedValueTest.SECRET,
02071 "key", "value", clock=self.present)
02072 self.assertEqual(signed_default, signed_v2)
02073
02074 decoded_v1 = decode_signed_value(SignedValueTest.SECRET, "key",
02075 signed_v1, min_version=1,
02076 clock=self.present)
02077 self.assertEqual(decoded_v1, b"value")
02078
02079 decoded_v2 = decode_signed_value(SignedValueTest.SECRET, "key",
02080 signed_v2, min_version=2,
02081 clock=self.present)
02082 self.assertEqual(decoded_v2, b"value")
02083
02084 def test_name_swap(self):
02085 signed1 = create_signed_value(SignedValueTest.SECRET, "key1", "value",
02086 clock=self.present)
02087 signed2 = create_signed_value(SignedValueTest.SECRET, "key2", "value",
02088 clock=self.present)
02089
02090 decoded1 = decode_signed_value(SignedValueTest.SECRET, "key2", signed1,
02091 clock=self.present)
02092 self.assertIs(decoded1, None)
02093 decoded2 = decode_signed_value(SignedValueTest.SECRET, "key1", signed2,
02094 clock=self.present)
02095 self.assertIs(decoded2, None)
02096
02097 def test_expired(self):
02098 signed = create_signed_value(SignedValueTest.SECRET, "key1", "value",
02099 clock=self.past)
02100 decoded_past = decode_signed_value(SignedValueTest.SECRET, "key1",
02101 signed, clock=self.past)
02102 self.assertEqual(decoded_past, b"value")
02103 decoded_present = decode_signed_value(SignedValueTest.SECRET, "key1",
02104 signed, clock=self.present)
02105 self.assertIs(decoded_present, None)
02106
02107 def test_payload_tampering(self):
02108
02109 sig = "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"
02110 def validate(prefix):
02111 return (b'value' ==
02112 decode_signed_value(SignedValueTest.SECRET, "key",
02113 prefix + sig, clock=self.present))
02114 self.assertTrue(validate("2|1:0|10:1300000000|3:key|8:dmFsdWU=|"))
02115
02116 self.assertFalse(validate("2|1:1|10:1300000000|3:key|8:dmFsdWU=|"))
02117
02118 self.assertFalse(validate("2|1:0|10:130000000|3:key|8:dmFsdWU=|"))
02119
02120 self.assertFalse(validate("2|1:0|10:1300000000|3:keey|8:dmFsdWU=|"))
02121
02122 def test_signature_tampering(self):
02123 prefix = "2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
02124 def validate(sig):
02125 return (b'value' ==
02126 decode_signed_value(SignedValueTest.SECRET, "key",
02127 prefix + sig, clock=self.present))
02128 self.assertTrue(validate(
02129 "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
02130
02131 self.assertFalse(validate("0" * 32))
02132
02133 self.assertFalse(validate(
02134 "4d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
02135
02136 self.assertFalse(validate(
02137 "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e153"))
02138
02139 self.assertFalse(validate(
02140 "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e15"))
02141
02142 self.assertFalse(validate(
02143 "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e1538"))
02144
02145 def test_non_ascii(self):
02146 value = b"\xe9"
02147 signed = create_signed_value(SignedValueTest.SECRET, "key", value,
02148 clock=self.present)
02149 decoded = decode_signed_value(SignedValueTest.SECRET, "key", signed,
02150 clock=self.present)
02151 self.assertEqual(value, decoded)
02152
02153
02154 @wsgi_safe
02155 class XSRFTest(SimpleHandlerTestCase):
02156 class Handler(RequestHandler):
02157 def get(self):
02158 version = int(self.get_argument("version", "2"))
02159
02160
02161 self.settings["xsrf_cookie_version"] = version
02162 self.write(self.xsrf_token)
02163
02164 def post(self):
02165 self.write("ok")
02166
02167 def get_app_kwargs(self):
02168 return dict(xsrf_cookies=True)
02169
02170 def setUp(self):
02171 super(XSRFTest, self).setUp()
02172 self.xsrf_token = self.get_token()
02173
02174 def get_token(self, old_token=None, version=None):
02175 if old_token is not None:
02176 headers = self.cookie_headers(old_token)
02177 else:
02178 headers = None
02179 response = self.fetch(
02180 "/" if version is None else ("/?version=%d" % version),
02181 headers=headers)
02182 response.rethrow()
02183 return native_str(response.body)
02184
02185 def cookie_headers(self, token=None):
02186 if token is None:
02187 token = self.xsrf_token
02188 return {"Cookie": "_xsrf=" + token}
02189
02190 def test_xsrf_fail_no_token(self):
02191 with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
02192 response = self.fetch("/", method="POST", body=b"")
02193 self.assertEqual(response.code, 403)
02194
02195 def test_xsrf_fail_body_no_cookie(self):
02196 with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
02197 response = self.fetch(
02198 "/", method="POST",
02199 body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)))
02200 self.assertEqual(response.code, 403)
02201
02202 def test_xsrf_fail_cookie_no_body(self):
02203 with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
02204 response = self.fetch(
02205 "/", method="POST", body=b"",
02206 headers=self.cookie_headers())
02207 self.assertEqual(response.code, 403)
02208
02209 def test_xsrf_success_short_token(self):
02210 response = self.fetch(
02211 "/", method="POST",
02212 body=urllib_parse.urlencode(dict(_xsrf='deadbeef')),
02213 headers=self.cookie_headers(token='deadbeef'))
02214 self.assertEqual(response.code, 200)
02215
02216 def test_xsrf_success_non_hex_token(self):
02217 response = self.fetch(
02218 "/", method="POST",
02219 body=urllib_parse.urlencode(dict(_xsrf='xoxo')),
02220 headers=self.cookie_headers(token='xoxo'))
02221 self.assertEqual(response.code, 200)
02222
02223 def test_xsrf_success_post_body(self):
02224 response = self.fetch(
02225 "/", method="POST",
02226 body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
02227 headers=self.cookie_headers())
02228 self.assertEqual(response.code, 200)
02229
02230 def test_xsrf_success_query_string(self):
02231 response = self.fetch(
02232 "/?" + urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
02233 method="POST", body=b"",
02234 headers=self.cookie_headers())
02235 self.assertEqual(response.code, 200)
02236
02237 def test_xsrf_success_header(self):
02238 response = self.fetch("/", method="POST", body=b"",
02239 headers=dict({"X-Xsrftoken": self.xsrf_token},
02240 **self.cookie_headers()))
02241 self.assertEqual(response.code, 200)
02242
02243 def test_distinct_tokens(self):
02244
02245 NUM_TOKENS = 10
02246 tokens = set()
02247 for i in range(NUM_TOKENS):
02248 tokens.add(self.get_token())
02249 self.assertEqual(len(tokens), NUM_TOKENS)
02250
02251 def test_cross_user(self):
02252 token2 = self.get_token()
02253
02254 for token in (self.xsrf_token, token2):
02255 response = self.fetch(
02256 "/", method="POST",
02257 body=urllib_parse.urlencode(dict(_xsrf=token)),
02258 headers=self.cookie_headers(token))
02259 self.assertEqual(response.code, 200)
02260
02261 for cookie_token, body_token in ((self.xsrf_token, token2),
02262 (token2, self.xsrf_token)):
02263 with ExpectLog(gen_log, '.*XSRF cookie does not match POST'):
02264 response = self.fetch(
02265 "/", method="POST",
02266 body=urllib_parse.urlencode(dict(_xsrf=body_token)),
02267 headers=self.cookie_headers(cookie_token))
02268 self.assertEqual(response.code, 403)
02269
02270 def test_refresh_token(self):
02271 token = self.xsrf_token
02272 tokens_seen = set([token])
02273
02274
02275
02276
02277 for i in range(5):
02278 token = self.get_token(token)
02279
02280 tokens_seen.add(token)
02281 response = self.fetch(
02282 "/", method="POST",
02283 body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
02284 headers=self.cookie_headers(token))
02285 self.assertEqual(response.code, 200)
02286 self.assertEqual(len(tokens_seen), 6)
02287
02288 def test_versioning(self):
02289
02290 self.assertNotEqual(self.get_token(version=1),
02291 self.get_token(version=1))
02292
02293
02294 v1_token = self.get_token(version=1)
02295 for i in range(5):
02296 self.assertEqual(self.get_token(v1_token, version=1), v1_token)
02297
02298
02299 v2_token = self.get_token(v1_token)
02300 self.assertNotEqual(v1_token, v2_token)
02301
02302 self.assertNotEqual(v2_token, self.get_token(v1_token))
02303
02304
02305 for cookie_token, body_token in ((v1_token, v2_token),
02306 (v2_token, v1_token)):
02307 response = self.fetch(
02308 "/", method="POST",
02309 body=urllib_parse.urlencode(dict(_xsrf=body_token)),
02310 headers=self.cookie_headers(cookie_token))
02311 self.assertEqual(response.code, 200)
02312
02313
02314 @wsgi_safe
02315 class FinishExceptionTest(SimpleHandlerTestCase):
02316 class Handler(RequestHandler):
02317 def get(self):
02318 self.set_status(401)
02319 self.set_header('WWW-Authenticate', 'Basic realm="something"')
02320 self.write('authentication required')
02321 raise Finish()
02322
02323 def test_finish_exception(self):
02324 response = self.fetch('/')
02325 self.assertEqual(response.code, 401)
02326 self.assertEqual('Basic realm="something"',
02327 response.headers.get('WWW-Authenticate'))
02328 self.assertEqual(b'authentication required', response.body)