web_test.py
Go to the documentation of this file.
00001 from __future__ import absolute_import, division, with_statement
00002 from tornado import gen
00003 from tornado.escape import json_decode, utf8, to_unicode, recursive_unicode, native_str
00004 from tornado.iostream import IOStream
00005 from tornado.template import DictLoader
00006 from tornado.testing import LogTrapTestCase, AsyncHTTPTestCase
00007 from tornado.util import b, bytes_type, ObjectDict
00008 from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature, create_signed_value
00009 
00010 import binascii
00011 import logging
00012 import os
00013 import re
00014 import socket
00015 import sys
00016 
00017 
00018 class SimpleHandlerTestCase(AsyncHTTPTestCase):
00019     """Simplified base class for tests that work with a single handler class.
00020 
00021     To use, define a nested class named ``Handler``.
00022     """
00023     def get_app(self):
00024         return Application([('/', self.Handler)],
00025                            log_function=lambda x: None)
00026 
00027 
00028 class CookieTestRequestHandler(RequestHandler):
00029     # stub out enough methods to make the secure_cookie functions work
00030     def __init__(self):
00031         # don't call super.__init__
00032         self._cookies = {}
00033         self.application = ObjectDict(settings=dict(cookie_secret='0123456789'))
00034 
00035     def get_cookie(self, name):
00036         return self._cookies.get(name)
00037 
00038     def set_cookie(self, name, value, expires_days=None):
00039         self._cookies[name] = value
00040 
00041 
00042 class SecureCookieTest(LogTrapTestCase):
00043     def test_round_trip(self):
00044         handler = CookieTestRequestHandler()
00045         handler.set_secure_cookie('foo', b('bar'))
00046         self.assertEqual(handler.get_secure_cookie('foo'), b('bar'))
00047 
00048     def test_cookie_tampering_future_timestamp(self):
00049         handler = CookieTestRequestHandler()
00050         # this string base64-encodes to '12345678'
00051         handler.set_secure_cookie('foo', binascii.a2b_hex(b('d76df8e7aefc')))
00052         cookie = handler._cookies['foo']
00053         match = re.match(b(r'12345678\|([0-9]+)\|([0-9a-f]+)'), cookie)
00054         assert match
00055         timestamp = match.group(1)
00056         sig = match.group(2)
00057         self.assertEqual(
00058             _create_signature(handler.application.settings["cookie_secret"],
00059                               'foo', '12345678', timestamp),
00060             sig)
00061         # shifting digits from payload to timestamp doesn't alter signature
00062         # (this is not desirable behavior, just confirming that that's how it
00063         # works)
00064         self.assertEqual(
00065             _create_signature(handler.application.settings["cookie_secret"],
00066                               'foo', '1234', b('5678') + timestamp),
00067             sig)
00068         # tamper with the cookie
00069         handler._cookies['foo'] = utf8('1234|5678%s|%s' % (timestamp, sig))
00070         # it gets rejected
00071         assert handler.get_secure_cookie('foo') is None
00072 
00073     def test_arbitrary_bytes(self):
00074         # Secure cookies accept arbitrary data (which is base64 encoded).
00075         # Note that normal cookies accept only a subset of ascii.
00076         handler = CookieTestRequestHandler()
00077         handler.set_secure_cookie('foo', b('\xe9'))
00078         self.assertEqual(handler.get_secure_cookie('foo'), b('\xe9'))
00079 
00080 
00081 class CookieTest(AsyncHTTPTestCase, LogTrapTestCase):
00082     def get_app(self):
00083         class SetCookieHandler(RequestHandler):
00084             def get(self):
00085                 # Try setting cookies with different argument types
00086                 # to ensure that everything gets encoded correctly
00087                 self.set_cookie("str", "asdf")
00088                 self.set_cookie("unicode", u"qwer")
00089                 self.set_cookie("bytes", b("zxcv"))
00090 
00091         class GetCookieHandler(RequestHandler):
00092             def get(self):
00093                 self.write(self.get_cookie("foo", "default"))
00094 
00095         class SetCookieDomainHandler(RequestHandler):
00096             def get(self):
00097                 # unicode domain and path arguments shouldn't break things
00098                 # either (see bug #285)
00099                 self.set_cookie("unicode_args", "blah", domain=u"foo.com",
00100                                 path=u"/foo")
00101 
00102         class SetCookieSpecialCharHandler(RequestHandler):
00103             def get(self):
00104                 self.set_cookie("equals", "a=b")
00105                 self.set_cookie("semicolon", "a;b")
00106                 self.set_cookie("quote", 'a"b')
00107 
00108         class SetCookieOverwriteHandler(RequestHandler):
00109             def get(self):
00110                 self.set_cookie("a", "b", domain="example.com")
00111                 self.set_cookie("c", "d", domain="example.com")
00112                 # A second call with the same name clobbers the first.
00113                 # Attributes from the first call are not carried over.
00114                 self.set_cookie("a", "e")
00115 
00116         return Application([
00117                 ("/set", SetCookieHandler),
00118                 ("/get", GetCookieHandler),
00119                 ("/set_domain", SetCookieDomainHandler),
00120                 ("/special_char", SetCookieSpecialCharHandler),
00121                 ("/set_overwrite", SetCookieOverwriteHandler),
00122                 ])
00123 
00124     def test_set_cookie(self):
00125         response = self.fetch("/set")
00126         self.assertEqual(sorted(response.headers.get_list("Set-Cookie")),
00127                          ["bytes=zxcv; Path=/",
00128                           "str=asdf; Path=/",
00129                           "unicode=qwer; Path=/",
00130                           ])
00131 
00132     def test_get_cookie(self):
00133         response = self.fetch("/get", headers={"Cookie": "foo=bar"})
00134         self.assertEqual(response.body, b("bar"))
00135 
00136         response = self.fetch("/get", headers={"Cookie": 'foo="bar"'})
00137         self.assertEqual(response.body, b("bar"))
00138 
00139         response = self.fetch("/get", headers={"Cookie": "/=exception;"})
00140         self.assertEqual(response.body, b("default"))
00141 
00142     def test_set_cookie_domain(self):
00143         response = self.fetch("/set_domain")
00144         self.assertEqual(response.headers.get_list("Set-Cookie"),
00145                          ["unicode_args=blah; Domain=foo.com; Path=/foo"])
00146 
00147     def test_cookie_special_char(self):
00148         response = self.fetch("/special_char")
00149         headers = sorted(response.headers.get_list("Set-Cookie"))
00150         self.assertEqual(len(headers), 3)
00151         self.assertEqual(headers[0], 'equals="a=b"; Path=/')
00152         self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
00153         # python 2.7 octal-escapes the semicolon; older versions leave it alone
00154         self.assertTrue(headers[2] in ('semicolon="a;b"; Path=/',
00155                                        'semicolon="a\\073b"; Path=/'),
00156                         headers[2])
00157 
00158         data = [('foo=a=b', 'a=b'),
00159                 ('foo="a=b"', 'a=b'),
00160                 ('foo="a;b"', 'a;b'),
00161                 #('foo=a\\073b', 'a;b'),  # even encoded, ";" is a delimiter
00162                 ('foo="a\\073b"', 'a;b'),
00163                 ('foo="a\\"b"', 'a"b'),
00164                 ]
00165         for header, expected in data:
00166             logging.info("trying %r", header)
00167             response = self.fetch("/get", headers={"Cookie": header})
00168             self.assertEqual(response.body, utf8(expected))
00169 
00170     def test_set_cookie_overwrite(self):
00171         response = self.fetch("/set_overwrite")
00172         headers = response.headers.get_list("Set-Cookie")
00173         self.assertEqual(sorted(headers),
00174                          ["a=e; Path=/", "c=d; Domain=example.com; Path=/"])
00175 
00176 
00177 class AuthRedirectRequestHandler(RequestHandler):
00178     def initialize(self, login_url):
00179         self.login_url = login_url
00180 
00181     def get_login_url(self):
00182         return self.login_url
00183 
00184     @authenticated
00185     def get(self):
00186         # we'll never actually get here because the test doesn't follow redirects
00187         self.send_error(500)
00188 
00189 
00190 class AuthRedirectTest(AsyncHTTPTestCase, LogTrapTestCase):
00191     def get_app(self):
00192         return Application([('/relative', AuthRedirectRequestHandler,
00193                              dict(login_url='/login')),
00194                             ('/absolute', AuthRedirectRequestHandler,
00195                              dict(login_url='http://example.com/login'))])
00196 
00197     def test_relative_auth_redirect(self):
00198         self.http_client.fetch(self.get_url('/relative'), self.stop,
00199                                follow_redirects=False)
00200         response = self.wait()
00201         self.assertEqual(response.code, 302)
00202         self.assertEqual(response.headers['Location'], '/login?next=%2Frelative')
00203 
00204     def test_absolute_auth_redirect(self):
00205         self.http_client.fetch(self.get_url('/absolute'), self.stop,
00206                                follow_redirects=False)
00207         response = self.wait()
00208         self.assertEqual(response.code, 302)
00209         self.assertTrue(re.match(
00210             'http://example.com/login\?next=http%3A%2F%2Flocalhost%3A[0-9]+%2Fabsolute',
00211             response.headers['Location']), response.headers['Location'])
00212 
00213 
00214 class ConnectionCloseHandler(RequestHandler):
00215     def initialize(self, test):
00216         self.test = test
00217 
00218     @asynchronous
00219     def get(self):
00220         self.test.on_handler_waiting()
00221 
00222     def on_connection_close(self):
00223         self.test.on_connection_close()
00224 
00225 
00226 class ConnectionCloseTest(AsyncHTTPTestCase, LogTrapTestCase):
00227     def get_app(self):
00228         return Application([('/', ConnectionCloseHandler, dict(test=self))])
00229 
00230     def test_connection_close(self):
00231         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00232         s.connect(("localhost", self.get_http_port()))
00233         self.stream = IOStream(s, io_loop=self.io_loop)
00234         self.stream.write(b("GET / HTTP/1.0\r\n\r\n"))
00235         self.wait()
00236 
00237     def on_handler_waiting(self):
00238         logging.info('handler waiting')
00239         self.stream.close()
00240 
00241     def on_connection_close(self):
00242         logging.info('connection closed')
00243         self.stop()
00244 
00245 
00246 class EchoHandler(RequestHandler):
00247     def get(self, *path_args):
00248         # Type checks: web.py interfaces convert argument values to
00249         # unicode strings (by default, but see also decode_argument).
00250         # In httpserver.py (i.e. self.request.arguments), they're left
00251         # as bytes.  Keys are always native strings.
00252         for key in self.request.arguments:
00253             assert type(key) == str, repr(key)
00254             for value in self.request.arguments[key]:
00255                 assert type(value) == bytes_type, repr(value)
00256             for value in self.get_arguments(key):
00257                 assert type(value) == unicode, repr(value)
00258         for arg in path_args:
00259             assert type(arg) == unicode, repr(arg)
00260         self.write(dict(path=self.request.path,
00261                         path_args=path_args,
00262                         args=recursive_unicode(self.request.arguments)))
00263 
00264 
00265 class RequestEncodingTest(AsyncHTTPTestCase, LogTrapTestCase):
00266     def get_app(self):
00267         return Application([
00268                 ("/group/(.*)", EchoHandler),
00269                 ("/slashes/([^/]*)/([^/]*)", EchoHandler),
00270                 ])
00271 
00272     def fetch_json(self, path):
00273         return json_decode(self.fetch(path).body)
00274 
00275     def test_group_question_mark(self):
00276         # Ensure that url-encoded question marks are handled properly
00277         self.assertEqual(self.fetch_json('/group/%3F'),
00278                          dict(path='/group/%3F', path_args=['?'], args={}))
00279         self.assertEqual(self.fetch_json('/group/%3F?%3F=%3F'),
00280                          dict(path='/group/%3F', path_args=['?'], args={'?': ['?']}))
00281 
00282     def test_group_encoding(self):
00283         # Path components and query arguments should be decoded the same way
00284         self.assertEqual(self.fetch_json('/group/%C3%A9?arg=%C3%A9'),
00285                          {u"path": u"/group/%C3%A9",
00286                           u"path_args": [u"\u00e9"],
00287                           u"args": {u"arg": [u"\u00e9"]}})
00288 
00289     def test_slashes(self):
00290         # Slashes may be escaped to appear as a single "directory" in the path,
00291         # but they are then unescaped when passed to the get() method.
00292         self.assertEqual(self.fetch_json('/slashes/foo/bar'),
00293                          dict(path="/slashes/foo/bar",
00294                               path_args=["foo", "bar"],
00295                               args={}))
00296         self.assertEqual(self.fetch_json('/slashes/a%2Fb/c%2Fd'),
00297                          dict(path="/slashes/a%2Fb/c%2Fd",
00298                               path_args=["a/b", "c/d"],
00299                               args={}))
00300 
00301 
00302 class TypeCheckHandler(RequestHandler):
00303     def prepare(self):
00304         self.errors = {}
00305 
00306         self.check_type('status', self.get_status(), int)
00307 
00308         # get_argument is an exception from the general rule of using
00309         # type str for non-body data mainly for historical reasons.
00310         self.check_type('argument', self.get_argument('foo'), unicode)
00311         self.check_type('cookie_key', self.cookies.keys()[0], str)
00312         self.check_type('cookie_value', self.cookies.values()[0].value, str)
00313 
00314         # Secure cookies return bytes because they can contain arbitrary
00315         # data, but regular cookies are native strings.
00316         assert self.cookies.keys() == ['asdf']
00317         self.check_type('get_secure_cookie', self.get_secure_cookie('asdf'), bytes_type)
00318         self.check_type('get_cookie', self.get_cookie('asdf'), str)
00319 
00320         self.check_type('xsrf_token', self.xsrf_token, bytes_type)
00321         self.check_type('xsrf_form_html', self.xsrf_form_html(), str)
00322 
00323         self.check_type('reverse_url', self.reverse_url('typecheck', 'foo'), str)
00324 
00325         self.check_type('request_summary', self._request_summary(), str)
00326 
00327     def get(self, path_component):
00328         # path_component uses type unicode instead of str for consistency
00329         # with get_argument()
00330         self.check_type('path_component', path_component, unicode)
00331         self.write(self.errors)
00332 
00333     def post(self, path_component):
00334         self.check_type('path_component', path_component, unicode)
00335         self.write(self.errors)
00336 
00337     def check_type(self, name, obj, expected_type):
00338         actual_type = type(obj)
00339         if expected_type != actual_type:
00340             self.errors[name] = "expected %s, got %s" % (expected_type,
00341                                                          actual_type)
00342 
00343 
00344 class DecodeArgHandler(RequestHandler):
00345     def decode_argument(self, value, name=None):
00346         assert type(value) == bytes_type, repr(value)
00347         # use self.request.arguments directly to avoid recursion
00348         if 'encoding' in self.request.arguments:
00349             return value.decode(to_unicode(self.request.arguments['encoding'][0]))
00350         else:
00351             return value
00352 
00353     def get(self, arg):
00354         def describe(s):
00355             if type(s) == bytes_type:
00356                 return ["bytes", native_str(binascii.b2a_hex(s))]
00357             elif type(s) == unicode:
00358                 return ["unicode", s]
00359             raise Exception("unknown type")
00360         self.write({'path': describe(arg),
00361                     'query': describe(self.get_argument("foo")),
00362                     })
00363 
00364 
00365 class LinkifyHandler(RequestHandler):
00366     def get(self):
00367         self.render("linkify.html", message="http://example.com")
00368 
00369 
00370 class UIModuleResourceHandler(RequestHandler):
00371     def get(self):
00372         self.render("page.html", entries=[1, 2])
00373 
00374 
00375 class OptionalPathHandler(RequestHandler):
00376     def get(self, path):
00377         self.write({"path": path})
00378 
00379 
00380 class FlowControlHandler(RequestHandler):
00381     # These writes are too small to demonstrate real flow control,
00382     # but at least it shows that the callbacks get run.
00383     @asynchronous
00384     def get(self):
00385         self.write("1")
00386         self.flush(callback=self.step2)
00387 
00388     def step2(self):
00389         self.write("2")
00390         self.flush(callback=self.step3)
00391 
00392     def step3(self):
00393         self.write("3")
00394         self.finish()
00395 
00396 
00397 class MultiHeaderHandler(RequestHandler):
00398     def get(self):
00399         self.set_header("x-overwrite", "1")
00400         self.set_header("x-overwrite", 2)
00401         self.add_header("x-multi", 3)
00402         self.add_header("x-multi", "4")
00403 
00404 
00405 class RedirectHandler(RequestHandler):
00406     def get(self):
00407         if self.get_argument('permanent', None) is not None:
00408             self.redirect('/', permanent=int(self.get_argument('permanent')))
00409         elif self.get_argument('status', None) is not None:
00410             self.redirect('/', status=int(self.get_argument('status')))
00411         else:
00412             raise Exception("didn't get permanent or status arguments")
00413 
00414 
00415 class EmptyFlushCallbackHandler(RequestHandler):
00416     @gen.engine
00417     @asynchronous
00418     def get(self):
00419         # Ensure that the flush callback is run whether or not there
00420         # was any output.
00421         yield gen.Task(self.flush)  # "empty" flush, but writes headers
00422         yield gen.Task(self.flush)  # empty flush
00423         self.write("o")
00424         yield gen.Task(self.flush)  # flushes the "o"
00425         yield gen.Task(self.flush)  # empty flush
00426         self.finish("k")
00427 
00428 
00429 class HeaderInjectionHandler(RequestHandler):
00430     def get(self):
00431         try:
00432             self.set_header("X-Foo", "foo\r\nX-Bar: baz")
00433             raise Exception("Didn't get expected exception")
00434         except ValueError, e:
00435             assert "Unsafe header value" in str(e)
00436             self.finish(b("ok"))
00437 
00438 
00439 class WebTest(AsyncHTTPTestCase, LogTrapTestCase):
00440     COOKIE_SECRET = "WebTest.COOKIE_SECRET"
00441 
00442     def get_app(self):
00443         loader = DictLoader({
00444                 "linkify.html": "{% module linkify(message) %}",
00445                 "page.html": """\
00446 <html><head></head><body>
00447 {% for e in entries %}
00448 {% module Template("entry.html", entry=e) %}
00449 {% end %}
00450 </body></html>""",
00451                 "entry.html": """\
00452 {{ set_resources(embedded_css=".entry { margin-bottom: 1em; }", embedded_javascript="js_embed()", css_files=["/base.css", "/foo.css"], javascript_files="/common.js", html_head="<meta>", html_body='<script src="/analytics.js"/>') }}
00453 <div class="entry">...</div>""",
00454                 })
00455         urls = [
00456             url("/typecheck/(.*)", TypeCheckHandler, name='typecheck'),
00457             url("/decode_arg/(.*)", DecodeArgHandler, name='decode_arg'),
00458             url("/decode_arg_kw/(?P<arg>.*)", DecodeArgHandler),
00459             url("/linkify", LinkifyHandler),
00460             url("/uimodule_resources", UIModuleResourceHandler),
00461             url("/optional_path/(.+)?", OptionalPathHandler),
00462             url("/flow_control", FlowControlHandler),
00463             url("/multi_header", MultiHeaderHandler),
00464             url("/redirect", RedirectHandler),
00465             url("/empty_flush", EmptyFlushCallbackHandler),
00466             url("/header_injection", HeaderInjectionHandler),
00467             ]
00468         self.app = Application(urls,
00469                                template_loader=loader,
00470                                autoescape="xhtml_escape",
00471                                cookie_secret=self.COOKIE_SECRET)
00472         return self.app
00473 
00474     def fetch_json(self, *args, **kwargs):
00475         response = self.fetch(*args, **kwargs)
00476         response.rethrow()
00477         return json_decode(response.body)
00478 
00479     def test_types(self):
00480         cookie_value = to_unicode(create_signed_value(self.COOKIE_SECRET,
00481                                                       "asdf", "qwer"))
00482         response = self.fetch("/typecheck/asdf?foo=bar",
00483                               headers={"Cookie": "asdf=" + cookie_value})
00484         data = json_decode(response.body)
00485         self.assertEqual(data, {})
00486 
00487         response = self.fetch("/typecheck/asdf?foo=bar", method="POST",
00488                               headers={"Cookie": "asdf=" + cookie_value},
00489                               body="foo=bar")
00490 
00491     def test_decode_argument(self):
00492         # These urls all decode to the same thing
00493         urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
00494                 "/decode_arg/%E9?foo=%E9&encoding=latin1",
00495                 "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
00496                 ]
00497         for url in urls:
00498             response = self.fetch(url)
00499             response.rethrow()
00500             data = json_decode(response.body)
00501             self.assertEqual(data, {u'path': [u'unicode', u'\u00e9'],
00502                                     u'query': [u'unicode', u'\u00e9'],
00503                                     })
00504 
00505         response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
00506         response.rethrow()
00507         data = json_decode(response.body)
00508         self.assertEqual(data, {u'path': [u'bytes', u'c3a9'],
00509                                 u'query': [u'bytes', u'c3a9'],
00510                                 })
00511 
00512     def test_reverse_url(self):
00513         self.assertEqual(self.app.reverse_url('decode_arg', 'foo'),
00514                          '/decode_arg/foo')
00515         self.assertEqual(self.app.reverse_url('decode_arg', 42),
00516                          '/decode_arg/42')
00517         self.assertEqual(self.app.reverse_url('decode_arg', b('\xe9')),
00518                          '/decode_arg/%E9')
00519         self.assertEqual(self.app.reverse_url('decode_arg', u'\u00e9'),
00520                          '/decode_arg/%C3%A9')
00521 
00522     def test_uimodule_unescaped(self):
00523         response = self.fetch("/linkify")
00524         self.assertEqual(response.body,
00525                          b("<a href=\"http://example.com\">http://example.com</a>"))
00526 
00527     def test_uimodule_resources(self):
00528         response = self.fetch("/uimodule_resources")
00529         self.assertEqual(response.body, b("""\
00530 <html><head><link href="/base.css" type="text/css" rel="stylesheet"/><link href="/foo.css" type="text/css" rel="stylesheet"/>
00531 <style type="text/css">
00532 .entry { margin-bottom: 1em; }
00533 </style>
00534 <meta>
00535 </head><body>
00536 
00537 
00538 <div class="entry">...</div>
00539 
00540 
00541 <div class="entry">...</div>
00542 
00543 <script src="/common.js" type="text/javascript"></script>
00544 <script type="text/javascript">
00545 //<![CDATA[
00546 js_embed()
00547 //]]>
00548 </script>
00549 <script src="/analytics.js"/>
00550 </body></html>"""))
00551 
00552     def test_optional_path(self):
00553         self.assertEqual(self.fetch_json("/optional_path/foo"),
00554                          {u"path": u"foo"})
00555         self.assertEqual(self.fetch_json("/optional_path/"),
00556                          {u"path": None})
00557 
00558     def test_flow_control(self):
00559         self.assertEqual(self.fetch("/flow_control").body, b("123"))
00560 
00561     def test_multi_header(self):
00562         response = self.fetch("/multi_header")
00563         self.assertEqual(response.headers["x-overwrite"], "2")
00564         self.assertEqual(response.headers.get_list("x-multi"), ["3", "4"])
00565 
00566     def test_redirect(self):
00567         response = self.fetch("/redirect?permanent=1", follow_redirects=False)
00568         self.assertEqual(response.code, 301)
00569         response = self.fetch("/redirect?permanent=0", follow_redirects=False)
00570         self.assertEqual(response.code, 302)
00571         response = self.fetch("/redirect?status=307", follow_redirects=False)
00572         self.assertEqual(response.code, 307)
00573 
00574     def test_empty_flush(self):
00575         response = self.fetch("/empty_flush")
00576         self.assertEqual(response.body, b("ok"))
00577 
00578     def test_header_injection(self):
00579         response = self.fetch("/header_injection")
00580         self.assertEqual(response.body, b("ok"))
00581 
00582 
00583 class ErrorResponseTest(AsyncHTTPTestCase, LogTrapTestCase):
00584     def get_app(self):
00585         class DefaultHandler(RequestHandler):
00586             def get(self):
00587                 if self.get_argument("status", None):
00588                     raise HTTPError(int(self.get_argument("status")))
00589                 1 / 0
00590 
00591         class WriteErrorHandler(RequestHandler):
00592             def get(self):
00593                 if self.get_argument("status", None):
00594                     self.send_error(int(self.get_argument("status")))
00595                 else:
00596                     1 / 0
00597 
00598             def write_error(self, status_code, **kwargs):
00599                 self.set_header("Content-Type", "text/plain")
00600                 if "exc_info" in kwargs:
00601                     self.write("Exception: %s" % kwargs["exc_info"][0].__name__)
00602                 else:
00603                     self.write("Status: %d" % status_code)
00604 
00605         class GetErrorHtmlHandler(RequestHandler):
00606             def get(self):
00607                 if self.get_argument("status", None):
00608                     self.send_error(int(self.get_argument("status")))
00609                 else:
00610                     1 / 0
00611 
00612             def get_error_html(self, status_code, **kwargs):
00613                 self.set_header("Content-Type", "text/plain")
00614                 if "exception" in kwargs:
00615                     self.write("Exception: %s" % sys.exc_info()[0].__name__)
00616                 else:
00617                     self.write("Status: %d" % status_code)
00618 
00619         class FailedWriteErrorHandler(RequestHandler):
00620             def get(self):
00621                 1 / 0
00622 
00623             def write_error(self, status_code, **kwargs):
00624                 raise Exception("exception in write_error")
00625 
00626         return Application([
00627                 url("/default", DefaultHandler),
00628                 url("/write_error", WriteErrorHandler),
00629                 url("/get_error_html", GetErrorHtmlHandler),
00630                 url("/failed_write_error", FailedWriteErrorHandler),
00631                 ])
00632 
00633     def test_default(self):
00634         response = self.fetch("/default")
00635         self.assertEqual(response.code, 500)
00636         self.assertTrue(b("500: Internal Server Error") in response.body)
00637 
00638         response = self.fetch("/default?status=503")
00639         self.assertEqual(response.code, 503)
00640         self.assertTrue(b("503: Service Unavailable") in response.body)
00641 
00642     def test_write_error(self):
00643         response = self.fetch("/write_error")
00644         self.assertEqual(response.code, 500)
00645         self.assertEqual(b("Exception: ZeroDivisionError"), response.body)
00646 
00647         response = self.fetch("/write_error?status=503")
00648         self.assertEqual(response.code, 503)
00649         self.assertEqual(b("Status: 503"), response.body)
00650 
00651     def test_get_error_html(self):
00652         response = self.fetch("/get_error_html")
00653         self.assertEqual(response.code, 500)
00654         self.assertEqual(b("Exception: ZeroDivisionError"), response.body)
00655 
00656         response = self.fetch("/get_error_html?status=503")
00657         self.assertEqual(response.code, 503)
00658         self.assertEqual(b("Status: 503"), response.body)
00659 
00660     def test_failed_write_error(self):
00661         response = self.fetch("/failed_write_error")
00662         self.assertEqual(response.code, 500)
00663         self.assertEqual(b(""), response.body)
00664 
00665 
00666 class StaticFileTest(AsyncHTTPTestCase, LogTrapTestCase):
00667     def get_app(self):
00668         class StaticUrlHandler(RequestHandler):
00669             def get(self, path):
00670                 self.write(self.static_url(path))
00671 
00672         class AbsoluteStaticUrlHandler(RequestHandler):
00673             include_host = True
00674 
00675             def get(self, path):
00676                 self.write(self.static_url(path))
00677 
00678         class OverrideStaticUrlHandler(RequestHandler):
00679             def get(self, path):
00680                 do_include = bool(self.get_argument("include_host"))
00681                 self.include_host = not do_include
00682 
00683                 regular_url = self.static_url(path)
00684                 override_url = self.static_url(path, include_host=do_include)
00685                 if override_url == regular_url:
00686                     return self.write(str(False))
00687 
00688                 protocol = self.request.protocol + "://"
00689                 protocol_length = len(protocol)
00690                 check_regular = regular_url.find(protocol, 0, protocol_length)
00691                 check_override = override_url.find(protocol, 0, protocol_length)
00692 
00693                 if do_include:
00694                     result = (check_override == 0 and check_regular == -1)
00695                 else:
00696                     result = (check_override == -1 and check_regular == 0)
00697                 self.write(str(result))
00698 
00699         return Application([('/static_url/(.*)', StaticUrlHandler),
00700                             ('/abs_static_url/(.*)', AbsoluteStaticUrlHandler),
00701                             ('/override_static_url/(.*)', OverrideStaticUrlHandler)],
00702                            static_path=os.path.join(os.path.dirname(__file__), 'static'))
00703 
00704     def test_static_files(self):
00705         response = self.fetch('/robots.txt')
00706         assert b("Disallow: /") in response.body
00707 
00708         response = self.fetch('/static/robots.txt')
00709         assert b("Disallow: /") in response.body
00710 
00711     def test_static_url(self):
00712         response = self.fetch("/static_url/robots.txt")
00713         self.assertEqual(response.body, b("/static/robots.txt?v=f71d2"))
00714 
00715     def test_absolute_static_url(self):
00716         response = self.fetch("/abs_static_url/robots.txt")
00717         self.assertEqual(response.body,
00718                          utf8(self.get_url("/") + "static/robots.txt?v=f71d2"))
00719 
00720     def test_include_host_override(self):
00721         self._trigger_include_host_check(False)
00722         self._trigger_include_host_check(True)
00723 
00724     def _trigger_include_host_check(self, include_host):
00725         path = "/override_static_url/robots.txt?include_host=%s"
00726         response = self.fetch(path % int(include_host))
00727         self.assertEqual(response.body, utf8(str(True)))
00728 
00729     def test_static_304(self):
00730         response1 = self.fetch("/static/robots.txt")
00731         response2 = self.fetch("/static/robots.txt", headers={
00732                 'If-Modified-Since': response1.headers['Last-Modified']})
00733         self.assertEqual(response2.code, 304)
00734         self.assertTrue('Content-Length' not in response2.headers)
00735         self.assertTrue('Last-Modified' not in response2.headers)
00736 
00737 
00738 class CustomStaticFileTest(AsyncHTTPTestCase, LogTrapTestCase):
00739     def get_app(self):
00740         class MyStaticFileHandler(StaticFileHandler):
00741             def get(self, path):
00742                 path = self.parse_url_path(path)
00743                 assert path == "foo.txt"
00744                 self.write("bar")
00745 
00746             @classmethod
00747             def make_static_url(cls, settings, path):
00748                 cls.get_version(settings, path)
00749                 extension_index = path.rindex('.')
00750                 before_version = path[:extension_index]
00751                 after_version = path[(extension_index + 1):]
00752                 return '/static/%s.%s.%s' % (before_version, 42, after_version)
00753 
00754             @classmethod
00755             def parse_url_path(cls, url_path):
00756                 extension_index = url_path.rindex('.')
00757                 version_index = url_path.rindex('.', 0, extension_index)
00758                 return '%s%s' % (url_path[:version_index],
00759                                  url_path[extension_index:])
00760 
00761         class StaticUrlHandler(RequestHandler):
00762             def get(self, path):
00763                 self.write(self.static_url(path))
00764 
00765         return Application([("/static_url/(.*)", StaticUrlHandler)],
00766                            static_path="dummy",
00767                            static_handler_class=MyStaticFileHandler)
00768 
00769     def test_serve(self):
00770         response = self.fetch("/static/foo.42.txt")
00771         self.assertEqual(response.body, b("bar"))
00772 
00773     def test_static_url(self):
00774         response = self.fetch("/static_url/foo.txt")
00775         self.assertEqual(response.body, b("/static/foo.42.txt"))
00776 
00777 
00778 class NamedURLSpecGroupsTest(AsyncHTTPTestCase, LogTrapTestCase):
00779     def get_app(self):
00780         class EchoHandler(RequestHandler):
00781             def get(self, path):
00782                 self.write(path)
00783 
00784         return Application([("/str/(?P<path>.*)", EchoHandler),
00785                             (u"/unicode/(?P<path>.*)", EchoHandler)])
00786 
00787     def test_named_urlspec_groups(self):
00788         response = self.fetch("/str/foo")
00789         self.assertEqual(response.body, b("foo"))
00790 
00791         response = self.fetch("/unicode/bar")
00792         self.assertEqual(response.body, b("bar"))
00793 
00794 
00795 class ClearHeaderTest(SimpleHandlerTestCase):
00796     class Handler(RequestHandler):
00797         def get(self):
00798             self.set_header("h1", "foo")
00799             self.set_header("h2", "bar")
00800             self.clear_header("h1")
00801             self.clear_header("nonexistent")
00802 
00803     def test_clear_header(self):
00804         response = self.fetch("/")
00805         self.assertTrue("h1" not in response.headers)
00806         self.assertEqual(response.headers["h2"], "bar")
00807 
00808 
00809 class Header304Test(SimpleHandlerTestCase):
00810     class Handler(RequestHandler):
00811         def get(self):
00812             self.set_header("Content-Language", "en_US")
00813             self.write("hello")
00814 
00815     def test_304_headers(self):
00816         response1 = self.fetch('/')
00817         self.assertEqual(response1.headers["Content-Length"], "5")
00818         self.assertEqual(response1.headers["Content-Language"], "en_US")
00819 
00820         response2 = self.fetch('/', headers={
00821                 'If-None-Match': response1.headers["Etag"]})
00822         self.assertEqual(response2.code, 304)
00823         self.assertTrue("Content-Length" not in response2.headers)
00824         self.assertTrue("Content-Language" not in response2.headers)
00825         # Not an entity header, but should not be added to 304s by chunking
00826         self.assertTrue("Transfer-Encoding" not in response2.headers)


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Mon Oct 6 2014 06:58:14