00001 from __future__ import absolute_import, division, with_statement
00002 from tornado import gen
00003 from tornado.escape import json_decode, utf8, to_unicode, recursive_unicode, native_str
00004 from tornado.iostream import IOStream
00005 from tornado.template import DictLoader
00006 from tornado.testing import LogTrapTestCase, AsyncHTTPTestCase
00007 from tornado.util import b, bytes_type, ObjectDict
00008 from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature, create_signed_value
00009
00010 import binascii
00011 import logging
00012 import os
00013 import re
00014 import socket
00015 import sys
00016
00017
00018 class SimpleHandlerTestCase(AsyncHTTPTestCase):
00019 """Simplified base class for tests that work with a single handler class.
00020
00021 To use, define a nested class named ``Handler``.
00022 """
00023 def get_app(self):
00024 return Application([('/', self.Handler)],
00025 log_function=lambda x: None)
00026
00027
00028 class CookieTestRequestHandler(RequestHandler):
00029
00030 def __init__(self):
00031
00032 self._cookies = {}
00033 self.application = ObjectDict(settings=dict(cookie_secret='0123456789'))
00034
00035 def get_cookie(self, name):
00036 return self._cookies.get(name)
00037
00038 def set_cookie(self, name, value, expires_days=None):
00039 self._cookies[name] = value
00040
00041
00042 class SecureCookieTest(LogTrapTestCase):
00043 def test_round_trip(self):
00044 handler = CookieTestRequestHandler()
00045 handler.set_secure_cookie('foo', b('bar'))
00046 self.assertEqual(handler.get_secure_cookie('foo'), b('bar'))
00047
00048 def test_cookie_tampering_future_timestamp(self):
00049 handler = CookieTestRequestHandler()
00050
00051 handler.set_secure_cookie('foo', binascii.a2b_hex(b('d76df8e7aefc')))
00052 cookie = handler._cookies['foo']
00053 match = re.match(b(r'12345678\|([0-9]+)\|([0-9a-f]+)'), cookie)
00054 assert match
00055 timestamp = match.group(1)
00056 sig = match.group(2)
00057 self.assertEqual(
00058 _create_signature(handler.application.settings["cookie_secret"],
00059 'foo', '12345678', timestamp),
00060 sig)
00061
00062
00063
00064 self.assertEqual(
00065 _create_signature(handler.application.settings["cookie_secret"],
00066 'foo', '1234', b('5678') + timestamp),
00067 sig)
00068
00069 handler._cookies['foo'] = utf8('1234|5678%s|%s' % (timestamp, sig))
00070
00071 assert handler.get_secure_cookie('foo') is None
00072
00073 def test_arbitrary_bytes(self):
00074
00075
00076 handler = CookieTestRequestHandler()
00077 handler.set_secure_cookie('foo', b('\xe9'))
00078 self.assertEqual(handler.get_secure_cookie('foo'), b('\xe9'))
00079
00080
00081 class CookieTest(AsyncHTTPTestCase, LogTrapTestCase):
00082 def get_app(self):
00083 class SetCookieHandler(RequestHandler):
00084 def get(self):
00085
00086
00087 self.set_cookie("str", "asdf")
00088 self.set_cookie("unicode", u"qwer")
00089 self.set_cookie("bytes", b("zxcv"))
00090
00091 class GetCookieHandler(RequestHandler):
00092 def get(self):
00093 self.write(self.get_cookie("foo", "default"))
00094
00095 class SetCookieDomainHandler(RequestHandler):
00096 def get(self):
00097
00098
00099 self.set_cookie("unicode_args", "blah", domain=u"foo.com",
00100 path=u"/foo")
00101
00102 class SetCookieSpecialCharHandler(RequestHandler):
00103 def get(self):
00104 self.set_cookie("equals", "a=b")
00105 self.set_cookie("semicolon", "a;b")
00106 self.set_cookie("quote", 'a"b')
00107
00108 class SetCookieOverwriteHandler(RequestHandler):
00109 def get(self):
00110 self.set_cookie("a", "b", domain="example.com")
00111 self.set_cookie("c", "d", domain="example.com")
00112
00113
00114 self.set_cookie("a", "e")
00115
00116 return Application([
00117 ("/set", SetCookieHandler),
00118 ("/get", GetCookieHandler),
00119 ("/set_domain", SetCookieDomainHandler),
00120 ("/special_char", SetCookieSpecialCharHandler),
00121 ("/set_overwrite", SetCookieOverwriteHandler),
00122 ])
00123
00124 def test_set_cookie(self):
00125 response = self.fetch("/set")
00126 self.assertEqual(sorted(response.headers.get_list("Set-Cookie")),
00127 ["bytes=zxcv; Path=/",
00128 "str=asdf; Path=/",
00129 "unicode=qwer; Path=/",
00130 ])
00131
00132 def test_get_cookie(self):
00133 response = self.fetch("/get", headers={"Cookie": "foo=bar"})
00134 self.assertEqual(response.body, b("bar"))
00135
00136 response = self.fetch("/get", headers={"Cookie": 'foo="bar"'})
00137 self.assertEqual(response.body, b("bar"))
00138
00139 response = self.fetch("/get", headers={"Cookie": "/=exception;"})
00140 self.assertEqual(response.body, b("default"))
00141
00142 def test_set_cookie_domain(self):
00143 response = self.fetch("/set_domain")
00144 self.assertEqual(response.headers.get_list("Set-Cookie"),
00145 ["unicode_args=blah; Domain=foo.com; Path=/foo"])
00146
00147 def test_cookie_special_char(self):
00148 response = self.fetch("/special_char")
00149 headers = sorted(response.headers.get_list("Set-Cookie"))
00150 self.assertEqual(len(headers), 3)
00151 self.assertEqual(headers[0], 'equals="a=b"; Path=/')
00152 self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
00153
00154 self.assertTrue(headers[2] in ('semicolon="a;b"; Path=/',
00155 'semicolon="a\\073b"; Path=/'),
00156 headers[2])
00157
00158 data = [('foo=a=b', 'a=b'),
00159 ('foo="a=b"', 'a=b'),
00160 ('foo="a;b"', 'a;b'),
00161
00162 ('foo="a\\073b"', 'a;b'),
00163 ('foo="a\\"b"', 'a"b'),
00164 ]
00165 for header, expected in data:
00166 logging.info("trying %r", header)
00167 response = self.fetch("/get", headers={"Cookie": header})
00168 self.assertEqual(response.body, utf8(expected))
00169
00170 def test_set_cookie_overwrite(self):
00171 response = self.fetch("/set_overwrite")
00172 headers = response.headers.get_list("Set-Cookie")
00173 self.assertEqual(sorted(headers),
00174 ["a=e; Path=/", "c=d; Domain=example.com; Path=/"])
00175
00176
00177 class AuthRedirectRequestHandler(RequestHandler):
00178 def initialize(self, login_url):
00179 self.login_url = login_url
00180
00181 def get_login_url(self):
00182 return self.login_url
00183
00184 @authenticated
00185 def get(self):
00186
00187 self.send_error(500)
00188
00189
00190 class AuthRedirectTest(AsyncHTTPTestCase, LogTrapTestCase):
00191 def get_app(self):
00192 return Application([('/relative', AuthRedirectRequestHandler,
00193 dict(login_url='/login')),
00194 ('/absolute', AuthRedirectRequestHandler,
00195 dict(login_url='http://example.com/login'))])
00196
00197 def test_relative_auth_redirect(self):
00198 self.http_client.fetch(self.get_url('/relative'), self.stop,
00199 follow_redirects=False)
00200 response = self.wait()
00201 self.assertEqual(response.code, 302)
00202 self.assertEqual(response.headers['Location'], '/login?next=%2Frelative')
00203
00204 def test_absolute_auth_redirect(self):
00205 self.http_client.fetch(self.get_url('/absolute'), self.stop,
00206 follow_redirects=False)
00207 response = self.wait()
00208 self.assertEqual(response.code, 302)
00209 self.assertTrue(re.match(
00210 'http://example.com/login\?next=http%3A%2F%2Flocalhost%3A[0-9]+%2Fabsolute',
00211 response.headers['Location']), response.headers['Location'])
00212
00213
00214 class ConnectionCloseHandler(RequestHandler):
00215 def initialize(self, test):
00216 self.test = test
00217
00218 @asynchronous
00219 def get(self):
00220 self.test.on_handler_waiting()
00221
00222 def on_connection_close(self):
00223 self.test.on_connection_close()
00224
00225
00226 class ConnectionCloseTest(AsyncHTTPTestCase, LogTrapTestCase):
00227 def get_app(self):
00228 return Application([('/', ConnectionCloseHandler, dict(test=self))])
00229
00230 def test_connection_close(self):
00231 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00232 s.connect(("localhost", self.get_http_port()))
00233 self.stream = IOStream(s, io_loop=self.io_loop)
00234 self.stream.write(b("GET / HTTP/1.0\r\n\r\n"))
00235 self.wait()
00236
00237 def on_handler_waiting(self):
00238 logging.info('handler waiting')
00239 self.stream.close()
00240
00241 def on_connection_close(self):
00242 logging.info('connection closed')
00243 self.stop()
00244
00245
00246 class EchoHandler(RequestHandler):
00247 def get(self, *path_args):
00248
00249
00250
00251
00252 for key in self.request.arguments:
00253 assert type(key) == str, repr(key)
00254 for value in self.request.arguments[key]:
00255 assert type(value) == bytes_type, repr(value)
00256 for value in self.get_arguments(key):
00257 assert type(value) == unicode, repr(value)
00258 for arg in path_args:
00259 assert type(arg) == unicode, repr(arg)
00260 self.write(dict(path=self.request.path,
00261 path_args=path_args,
00262 args=recursive_unicode(self.request.arguments)))
00263
00264
00265 class RequestEncodingTest(AsyncHTTPTestCase, LogTrapTestCase):
00266 def get_app(self):
00267 return Application([
00268 ("/group/(.*)", EchoHandler),
00269 ("/slashes/([^/]*)/([^/]*)", EchoHandler),
00270 ])
00271
00272 def fetch_json(self, path):
00273 return json_decode(self.fetch(path).body)
00274
00275 def test_group_question_mark(self):
00276
00277 self.assertEqual(self.fetch_json('/group/%3F'),
00278 dict(path='/group/%3F', path_args=['?'], args={}))
00279 self.assertEqual(self.fetch_json('/group/%3F?%3F=%3F'),
00280 dict(path='/group/%3F', path_args=['?'], args={'?': ['?']}))
00281
00282 def test_group_encoding(self):
00283
00284 self.assertEqual(self.fetch_json('/group/%C3%A9?arg=%C3%A9'),
00285 {u"path": u"/group/%C3%A9",
00286 u"path_args": [u"\u00e9"],
00287 u"args": {u"arg": [u"\u00e9"]}})
00288
00289 def test_slashes(self):
00290
00291
00292 self.assertEqual(self.fetch_json('/slashes/foo/bar'),
00293 dict(path="/slashes/foo/bar",
00294 path_args=["foo", "bar"],
00295 args={}))
00296 self.assertEqual(self.fetch_json('/slashes/a%2Fb/c%2Fd'),
00297 dict(path="/slashes/a%2Fb/c%2Fd",
00298 path_args=["a/b", "c/d"],
00299 args={}))
00300
00301
00302 class TypeCheckHandler(RequestHandler):
00303 def prepare(self):
00304 self.errors = {}
00305
00306 self.check_type('status', self.get_status(), int)
00307
00308
00309
00310 self.check_type('argument', self.get_argument('foo'), unicode)
00311 self.check_type('cookie_key', self.cookies.keys()[0], str)
00312 self.check_type('cookie_value', self.cookies.values()[0].value, str)
00313
00314
00315
00316 assert self.cookies.keys() == ['asdf']
00317 self.check_type('get_secure_cookie', self.get_secure_cookie('asdf'), bytes_type)
00318 self.check_type('get_cookie', self.get_cookie('asdf'), str)
00319
00320 self.check_type('xsrf_token', self.xsrf_token, bytes_type)
00321 self.check_type('xsrf_form_html', self.xsrf_form_html(), str)
00322
00323 self.check_type('reverse_url', self.reverse_url('typecheck', 'foo'), str)
00324
00325 self.check_type('request_summary', self._request_summary(), str)
00326
00327 def get(self, path_component):
00328
00329
00330 self.check_type('path_component', path_component, unicode)
00331 self.write(self.errors)
00332
00333 def post(self, path_component):
00334 self.check_type('path_component', path_component, unicode)
00335 self.write(self.errors)
00336
00337 def check_type(self, name, obj, expected_type):
00338 actual_type = type(obj)
00339 if expected_type != actual_type:
00340 self.errors[name] = "expected %s, got %s" % (expected_type,
00341 actual_type)
00342
00343
00344 class DecodeArgHandler(RequestHandler):
00345 def decode_argument(self, value, name=None):
00346 assert type(value) == bytes_type, repr(value)
00347
00348 if 'encoding' in self.request.arguments:
00349 return value.decode(to_unicode(self.request.arguments['encoding'][0]))
00350 else:
00351 return value
00352
00353 def get(self, arg):
00354 def describe(s):
00355 if type(s) == bytes_type:
00356 return ["bytes", native_str(binascii.b2a_hex(s))]
00357 elif type(s) == unicode:
00358 return ["unicode", s]
00359 raise Exception("unknown type")
00360 self.write({'path': describe(arg),
00361 'query': describe(self.get_argument("foo")),
00362 })
00363
00364
00365 class LinkifyHandler(RequestHandler):
00366 def get(self):
00367 self.render("linkify.html", message="http://example.com")
00368
00369
00370 class UIModuleResourceHandler(RequestHandler):
00371 def get(self):
00372 self.render("page.html", entries=[1, 2])
00373
00374
00375 class OptionalPathHandler(RequestHandler):
00376 def get(self, path):
00377 self.write({"path": path})
00378
00379
00380 class FlowControlHandler(RequestHandler):
00381
00382
00383 @asynchronous
00384 def get(self):
00385 self.write("1")
00386 self.flush(callback=self.step2)
00387
00388 def step2(self):
00389 self.write("2")
00390 self.flush(callback=self.step3)
00391
00392 def step3(self):
00393 self.write("3")
00394 self.finish()
00395
00396
00397 class MultiHeaderHandler(RequestHandler):
00398 def get(self):
00399 self.set_header("x-overwrite", "1")
00400 self.set_header("x-overwrite", 2)
00401 self.add_header("x-multi", 3)
00402 self.add_header("x-multi", "4")
00403
00404
00405 class RedirectHandler(RequestHandler):
00406 def get(self):
00407 if self.get_argument('permanent', None) is not None:
00408 self.redirect('/', permanent=int(self.get_argument('permanent')))
00409 elif self.get_argument('status', None) is not None:
00410 self.redirect('/', status=int(self.get_argument('status')))
00411 else:
00412 raise Exception("didn't get permanent or status arguments")
00413
00414
00415 class EmptyFlushCallbackHandler(RequestHandler):
00416 @gen.engine
00417 @asynchronous
00418 def get(self):
00419
00420
00421 yield gen.Task(self.flush)
00422 yield gen.Task(self.flush)
00423 self.write("o")
00424 yield gen.Task(self.flush)
00425 yield gen.Task(self.flush)
00426 self.finish("k")
00427
00428
00429 class HeaderInjectionHandler(RequestHandler):
00430 def get(self):
00431 try:
00432 self.set_header("X-Foo", "foo\r\nX-Bar: baz")
00433 raise Exception("Didn't get expected exception")
00434 except ValueError, e:
00435 assert "Unsafe header value" in str(e)
00436 self.finish(b("ok"))
00437
00438
00439 class WebTest(AsyncHTTPTestCase, LogTrapTestCase):
00440 COOKIE_SECRET = "WebTest.COOKIE_SECRET"
00441
00442 def get_app(self):
00443 loader = DictLoader({
00444 "linkify.html": "{% module linkify(message) %}",
00445 "page.html": """\
00446 <html><head></head><body>
00447 {% for e in entries %}
00448 {% module Template("entry.html", entry=e) %}
00449 {% end %}
00450 </body></html>""",
00451 "entry.html": """\
00452 {{ set_resources(embedded_css=".entry { margin-bottom: 1em; }", embedded_javascript="js_embed()", css_files=["/base.css", "/foo.css"], javascript_files="/common.js", html_head="<meta>", html_body='<script src="/analytics.js"/>') }}
00453 <div class="entry">...</div>""",
00454 })
00455 urls = [
00456 url("/typecheck/(.*)", TypeCheckHandler, name='typecheck'),
00457 url("/decode_arg/(.*)", DecodeArgHandler, name='decode_arg'),
00458 url("/decode_arg_kw/(?P<arg>.*)", DecodeArgHandler),
00459 url("/linkify", LinkifyHandler),
00460 url("/uimodule_resources", UIModuleResourceHandler),
00461 url("/optional_path/(.+)?", OptionalPathHandler),
00462 url("/flow_control", FlowControlHandler),
00463 url("/multi_header", MultiHeaderHandler),
00464 url("/redirect", RedirectHandler),
00465 url("/empty_flush", EmptyFlushCallbackHandler),
00466 url("/header_injection", HeaderInjectionHandler),
00467 ]
00468 self.app = Application(urls,
00469 template_loader=loader,
00470 autoescape="xhtml_escape",
00471 cookie_secret=self.COOKIE_SECRET)
00472 return self.app
00473
00474 def fetch_json(self, *args, **kwargs):
00475 response = self.fetch(*args, **kwargs)
00476 response.rethrow()
00477 return json_decode(response.body)
00478
00479 def test_types(self):
00480 cookie_value = to_unicode(create_signed_value(self.COOKIE_SECRET,
00481 "asdf", "qwer"))
00482 response = self.fetch("/typecheck/asdf?foo=bar",
00483 headers={"Cookie": "asdf=" + cookie_value})
00484 data = json_decode(response.body)
00485 self.assertEqual(data, {})
00486
00487 response = self.fetch("/typecheck/asdf?foo=bar", method="POST",
00488 headers={"Cookie": "asdf=" + cookie_value},
00489 body="foo=bar")
00490
00491 def test_decode_argument(self):
00492
00493 urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
00494 "/decode_arg/%E9?foo=%E9&encoding=latin1",
00495 "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
00496 ]
00497 for url in urls:
00498 response = self.fetch(url)
00499 response.rethrow()
00500 data = json_decode(response.body)
00501 self.assertEqual(data, {u'path': [u'unicode', u'\u00e9'],
00502 u'query': [u'unicode', u'\u00e9'],
00503 })
00504
00505 response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
00506 response.rethrow()
00507 data = json_decode(response.body)
00508 self.assertEqual(data, {u'path': [u'bytes', u'c3a9'],
00509 u'query': [u'bytes', u'c3a9'],
00510 })
00511
00512 def test_reverse_url(self):
00513 self.assertEqual(self.app.reverse_url('decode_arg', 'foo'),
00514 '/decode_arg/foo')
00515 self.assertEqual(self.app.reverse_url('decode_arg', 42),
00516 '/decode_arg/42')
00517 self.assertEqual(self.app.reverse_url('decode_arg', b('\xe9')),
00518 '/decode_arg/%E9')
00519 self.assertEqual(self.app.reverse_url('decode_arg', u'\u00e9'),
00520 '/decode_arg/%C3%A9')
00521
00522 def test_uimodule_unescaped(self):
00523 response = self.fetch("/linkify")
00524 self.assertEqual(response.body,
00525 b("<a href=\"http://example.com\">http://example.com</a>"))
00526
00527 def test_uimodule_resources(self):
00528 response = self.fetch("/uimodule_resources")
00529 self.assertEqual(response.body, b("""\
00530 <html><head><link href="/base.css" type="text/css" rel="stylesheet"/><link href="/foo.css" type="text/css" rel="stylesheet"/>
00531 <style type="text/css">
00532 .entry { margin-bottom: 1em; }
00533 </style>
00534 <meta>
00535 </head><body>
00536
00537
00538 <div class="entry">...</div>
00539
00540
00541 <div class="entry">...</div>
00542
00543 <script src="/common.js" type="text/javascript"></script>
00544 <script type="text/javascript">
00545 //<![CDATA[
00546 js_embed()
00547 //]]>
00548 </script>
00549 <script src="/analytics.js"/>
00550 </body></html>"""))
00551
00552 def test_optional_path(self):
00553 self.assertEqual(self.fetch_json("/optional_path/foo"),
00554 {u"path": u"foo"})
00555 self.assertEqual(self.fetch_json("/optional_path/"),
00556 {u"path": None})
00557
00558 def test_flow_control(self):
00559 self.assertEqual(self.fetch("/flow_control").body, b("123"))
00560
00561 def test_multi_header(self):
00562 response = self.fetch("/multi_header")
00563 self.assertEqual(response.headers["x-overwrite"], "2")
00564 self.assertEqual(response.headers.get_list("x-multi"), ["3", "4"])
00565
00566 def test_redirect(self):
00567 response = self.fetch("/redirect?permanent=1", follow_redirects=False)
00568 self.assertEqual(response.code, 301)
00569 response = self.fetch("/redirect?permanent=0", follow_redirects=False)
00570 self.assertEqual(response.code, 302)
00571 response = self.fetch("/redirect?status=307", follow_redirects=False)
00572 self.assertEqual(response.code, 307)
00573
00574 def test_empty_flush(self):
00575 response = self.fetch("/empty_flush")
00576 self.assertEqual(response.body, b("ok"))
00577
00578 def test_header_injection(self):
00579 response = self.fetch("/header_injection")
00580 self.assertEqual(response.body, b("ok"))
00581
00582
00583 class ErrorResponseTest(AsyncHTTPTestCase, LogTrapTestCase):
00584 def get_app(self):
00585 class DefaultHandler(RequestHandler):
00586 def get(self):
00587 if self.get_argument("status", None):
00588 raise HTTPError(int(self.get_argument("status")))
00589 1 / 0
00590
00591 class WriteErrorHandler(RequestHandler):
00592 def get(self):
00593 if self.get_argument("status", None):
00594 self.send_error(int(self.get_argument("status")))
00595 else:
00596 1 / 0
00597
00598 def write_error(self, status_code, **kwargs):
00599 self.set_header("Content-Type", "text/plain")
00600 if "exc_info" in kwargs:
00601 self.write("Exception: %s" % kwargs["exc_info"][0].__name__)
00602 else:
00603 self.write("Status: %d" % status_code)
00604
00605 class GetErrorHtmlHandler(RequestHandler):
00606 def get(self):
00607 if self.get_argument("status", None):
00608 self.send_error(int(self.get_argument("status")))
00609 else:
00610 1 / 0
00611
00612 def get_error_html(self, status_code, **kwargs):
00613 self.set_header("Content-Type", "text/plain")
00614 if "exception" in kwargs:
00615 self.write("Exception: %s" % sys.exc_info()[0].__name__)
00616 else:
00617 self.write("Status: %d" % status_code)
00618
00619 class FailedWriteErrorHandler(RequestHandler):
00620 def get(self):
00621 1 / 0
00622
00623 def write_error(self, status_code, **kwargs):
00624 raise Exception("exception in write_error")
00625
00626 return Application([
00627 url("/default", DefaultHandler),
00628 url("/write_error", WriteErrorHandler),
00629 url("/get_error_html", GetErrorHtmlHandler),
00630 url("/failed_write_error", FailedWriteErrorHandler),
00631 ])
00632
00633 def test_default(self):
00634 response = self.fetch("/default")
00635 self.assertEqual(response.code, 500)
00636 self.assertTrue(b("500: Internal Server Error") in response.body)
00637
00638 response = self.fetch("/default?status=503")
00639 self.assertEqual(response.code, 503)
00640 self.assertTrue(b("503: Service Unavailable") in response.body)
00641
00642 def test_write_error(self):
00643 response = self.fetch("/write_error")
00644 self.assertEqual(response.code, 500)
00645 self.assertEqual(b("Exception: ZeroDivisionError"), response.body)
00646
00647 response = self.fetch("/write_error?status=503")
00648 self.assertEqual(response.code, 503)
00649 self.assertEqual(b("Status: 503"), response.body)
00650
00651 def test_get_error_html(self):
00652 response = self.fetch("/get_error_html")
00653 self.assertEqual(response.code, 500)
00654 self.assertEqual(b("Exception: ZeroDivisionError"), response.body)
00655
00656 response = self.fetch("/get_error_html?status=503")
00657 self.assertEqual(response.code, 503)
00658 self.assertEqual(b("Status: 503"), response.body)
00659
00660 def test_failed_write_error(self):
00661 response = self.fetch("/failed_write_error")
00662 self.assertEqual(response.code, 500)
00663 self.assertEqual(b(""), response.body)
00664
00665
00666 class StaticFileTest(AsyncHTTPTestCase, LogTrapTestCase):
00667 def get_app(self):
00668 class StaticUrlHandler(RequestHandler):
00669 def get(self, path):
00670 self.write(self.static_url(path))
00671
00672 class AbsoluteStaticUrlHandler(RequestHandler):
00673 include_host = True
00674
00675 def get(self, path):
00676 self.write(self.static_url(path))
00677
00678 class OverrideStaticUrlHandler(RequestHandler):
00679 def get(self, path):
00680 do_include = bool(self.get_argument("include_host"))
00681 self.include_host = not do_include
00682
00683 regular_url = self.static_url(path)
00684 override_url = self.static_url(path, include_host=do_include)
00685 if override_url == regular_url:
00686 return self.write(str(False))
00687
00688 protocol = self.request.protocol + "://"
00689 protocol_length = len(protocol)
00690 check_regular = regular_url.find(protocol, 0, protocol_length)
00691 check_override = override_url.find(protocol, 0, protocol_length)
00692
00693 if do_include:
00694 result = (check_override == 0 and check_regular == -1)
00695 else:
00696 result = (check_override == -1 and check_regular == 0)
00697 self.write(str(result))
00698
00699 return Application([('/static_url/(.*)', StaticUrlHandler),
00700 ('/abs_static_url/(.*)', AbsoluteStaticUrlHandler),
00701 ('/override_static_url/(.*)', OverrideStaticUrlHandler)],
00702 static_path=os.path.join(os.path.dirname(__file__), 'static'))
00703
00704 def test_static_files(self):
00705 response = self.fetch('/robots.txt')
00706 assert b("Disallow: /") in response.body
00707
00708 response = self.fetch('/static/robots.txt')
00709 assert b("Disallow: /") in response.body
00710
00711 def test_static_url(self):
00712 response = self.fetch("/static_url/robots.txt")
00713 self.assertEqual(response.body, b("/static/robots.txt?v=f71d2"))
00714
00715 def test_absolute_static_url(self):
00716 response = self.fetch("/abs_static_url/robots.txt")
00717 self.assertEqual(response.body,
00718 utf8(self.get_url("/") + "static/robots.txt?v=f71d2"))
00719
00720 def test_include_host_override(self):
00721 self._trigger_include_host_check(False)
00722 self._trigger_include_host_check(True)
00723
00724 def _trigger_include_host_check(self, include_host):
00725 path = "/override_static_url/robots.txt?include_host=%s"
00726 response = self.fetch(path % int(include_host))
00727 self.assertEqual(response.body, utf8(str(True)))
00728
00729 def test_static_304(self):
00730 response1 = self.fetch("/static/robots.txt")
00731 response2 = self.fetch("/static/robots.txt", headers={
00732 'If-Modified-Since': response1.headers['Last-Modified']})
00733 self.assertEqual(response2.code, 304)
00734 self.assertTrue('Content-Length' not in response2.headers)
00735 self.assertTrue('Last-Modified' not in response2.headers)
00736
00737
00738 class CustomStaticFileTest(AsyncHTTPTestCase, LogTrapTestCase):
00739 def get_app(self):
00740 class MyStaticFileHandler(StaticFileHandler):
00741 def get(self, path):
00742 path = self.parse_url_path(path)
00743 assert path == "foo.txt"
00744 self.write("bar")
00745
00746 @classmethod
00747 def make_static_url(cls, settings, path):
00748 cls.get_version(settings, path)
00749 extension_index = path.rindex('.')
00750 before_version = path[:extension_index]
00751 after_version = path[(extension_index + 1):]
00752 return '/static/%s.%s.%s' % (before_version, 42, after_version)
00753
00754 @classmethod
00755 def parse_url_path(cls, url_path):
00756 extension_index = url_path.rindex('.')
00757 version_index = url_path.rindex('.', 0, extension_index)
00758 return '%s%s' % (url_path[:version_index],
00759 url_path[extension_index:])
00760
00761 class StaticUrlHandler(RequestHandler):
00762 def get(self, path):
00763 self.write(self.static_url(path))
00764
00765 return Application([("/static_url/(.*)", StaticUrlHandler)],
00766 static_path="dummy",
00767 static_handler_class=MyStaticFileHandler)
00768
00769 def test_serve(self):
00770 response = self.fetch("/static/foo.42.txt")
00771 self.assertEqual(response.body, b("bar"))
00772
00773 def test_static_url(self):
00774 response = self.fetch("/static_url/foo.txt")
00775 self.assertEqual(response.body, b("/static/foo.42.txt"))
00776
00777
00778 class NamedURLSpecGroupsTest(AsyncHTTPTestCase, LogTrapTestCase):
00779 def get_app(self):
00780 class EchoHandler(RequestHandler):
00781 def get(self, path):
00782 self.write(path)
00783
00784 return Application([("/str/(?P<path>.*)", EchoHandler),
00785 (u"/unicode/(?P<path>.*)", EchoHandler)])
00786
00787 def test_named_urlspec_groups(self):
00788 response = self.fetch("/str/foo")
00789 self.assertEqual(response.body, b("foo"))
00790
00791 response = self.fetch("/unicode/bar")
00792 self.assertEqual(response.body, b("bar"))
00793
00794
00795 class ClearHeaderTest(SimpleHandlerTestCase):
00796 class Handler(RequestHandler):
00797 def get(self):
00798 self.set_header("h1", "foo")
00799 self.set_header("h2", "bar")
00800 self.clear_header("h1")
00801 self.clear_header("nonexistent")
00802
00803 def test_clear_header(self):
00804 response = self.fetch("/")
00805 self.assertTrue("h1" not in response.headers)
00806 self.assertEqual(response.headers["h2"], "bar")
00807
00808
00809 class Header304Test(SimpleHandlerTestCase):
00810 class Handler(RequestHandler):
00811 def get(self):
00812 self.set_header("Content-Language", "en_US")
00813 self.write("hello")
00814
00815 def test_304_headers(self):
00816 response1 = self.fetch('/')
00817 self.assertEqual(response1.headers["Content-Length"], "5")
00818 self.assertEqual(response1.headers["Content-Language"], "en_US")
00819
00820 response2 = self.fetch('/', headers={
00821 'If-None-Match': response1.headers["Etag"]})
00822 self.assertEqual(response2.code, 304)
00823 self.assertTrue("Content-Length" not in response2.headers)
00824 self.assertTrue("Content-Language" not in response2.headers)
00825
00826 self.assertTrue("Transfer-Encoding" not in response2.headers)