00001
00002 from __future__ import unicode_literals
00003
00004 from six import PY3
00005 from six import StringIO
00006 from tests.compat import unittest
00007 from webob import Request, Response
00008
00009 import warnings
00010 import mock
00011
00012 from webtest import TestApp
00013 from webtest.compat import to_bytes
00014 from webtest.lint import check_headers
00015 from webtest.lint import check_content_type
00016 from webtest.lint import check_environ
00017 from webtest.lint import IteratorWrapper
00018 from webtest.lint import WriteWrapper
00019 from webtest.lint import ErrorWrapper
00020 from webtest.lint import InputWrapper
00021 from webtest.lint import to_string
00022 from webtest.lint import middleware
00023
00024 from six import BytesIO
00025
00026
00027 def application(environ, start_response):
00028 req = Request(environ)
00029 resp = Response()
00030 env_input = environ['wsgi.input']
00031 len_body = len(req.body)
00032 env_input.input.seek(0)
00033 if req.path_info == '/read':
00034 resp.body = env_input.read(len_body)
00035 elif req.path_info == '/read_line':
00036 resp.body = env_input.readline(len_body)
00037 elif req.path_info == '/read_lines':
00038 resp.body = b'-'.join(env_input.readlines(len_body))
00039 elif req.path_info == '/close':
00040 resp.body = env_input.close()
00041 return resp(environ, start_response)
00042
00043
00044 class TestToString(unittest.TestCase):
00045
00046 def test_to_string(self):
00047 self.assertEqual(to_string('foo'), 'foo')
00048 self.assertEqual(to_string(b'foo'), 'foo')
00049
00050
00051 class TestMiddleware(unittest.TestCase):
00052
00053 def test_lint_too_few_args(self):
00054 linter = middleware(application)
00055 with self.assertRaisesRegexp(AssertionError, "Two arguments required"):
00056 linter()
00057 with self.assertRaisesRegexp(AssertionError, "Two arguments required"):
00058 linter({})
00059
00060 def test_lint_no_keyword_args(self):
00061 linter = middleware(application)
00062 with self.assertRaisesRegexp(AssertionError, "No keyword arguments "
00063 "allowed"):
00064 linter({}, 'foo', baz='baz')
00065
00066
00067
00068 @mock.patch.multiple('webtest.lint',
00069 check_environ=lambda x: True,
00070 InputWrapper=lambda x: True)
00071 def test_lint_iterator_returned(self):
00072 linter = middleware(lambda x, y: None)
00073 msg = "The application must return an iterator, if only an empty list"
00074 with self.assertRaisesRegexp(AssertionError, msg):
00075 linter({'wsgi.input': 'foo', 'wsgi.errors': 'foo'}, 'foo')
00076
00077
00078 class TestInputWrapper(unittest.TestCase):
00079 def test_read(self):
00080 app = TestApp(application)
00081 resp = app.post('/read', 'hello')
00082 self.assertEqual(resp.body, b'hello')
00083
00084 def test_readline(self):
00085 app = TestApp(application)
00086 resp = app.post('/read_line', 'hello\n')
00087 self.assertEqual(resp.body, b'hello\n')
00088
00089 def test_readlines(self):
00090 app = TestApp(application)
00091 resp = app.post('/read_lines', 'hello\nt\n')
00092 self.assertEqual(resp.body, b'hello\n-t\n')
00093
00094 def test_close(self):
00095 input_wrapper = InputWrapper(None)
00096 self.assertRaises(AssertionError, input_wrapper.close)
00097
00098 def test_iter(self):
00099 data = to_bytes("A line\nAnother line\nA final line\n")
00100 input_wrapper = InputWrapper(BytesIO(data))
00101 self.assertEquals(to_bytes("").join(input_wrapper), data, '')
00102
00103 def test_seek(self):
00104 data = to_bytes("A line\nAnother line\nA final line\n")
00105 input_wrapper = InputWrapper(BytesIO(data))
00106 input_wrapper.seek(0)
00107 self.assertEquals(to_bytes("").join(input_wrapper), data, '')
00108
00109
00110 class TestMiddleware2(unittest.TestCase):
00111 def test_exc_info(self):
00112 def application_exc_info(environ, start_response):
00113 body = to_bytes('body stuff')
00114 headers = [
00115 ('Content-Type', 'text/plain; charset=utf-8'),
00116 ('Content-Length', str(len(body)))]
00117 start_response(to_bytes('200 OK'), headers, ('stuff',))
00118 return [body]
00119
00120 app = TestApp(application_exc_info)
00121 app.get('/')
00122
00123
00124
00125 class TestCheckContentType(unittest.TestCase):
00126 def test_no_content(self):
00127 status = "204 No Content"
00128 headers = [
00129 ('Content-Type', 'text/plain; charset=utf-8'),
00130 ('Content-Length', '4')
00131 ]
00132 self.assertRaises(AssertionError, check_content_type, status, headers)
00133
00134 def test_no_content_type(self):
00135 status = "200 OK"
00136 headers = [
00137 ('Content-Length', '4')
00138 ]
00139 self.assertRaises(AssertionError, check_content_type, status, headers)
00140
00141
00142 class TestCheckHeaders(unittest.TestCase):
00143
00144 @unittest.skipIf(not PY3, 'Useless in Python2')
00145 def test_header_unicode_value(self):
00146 self.assertRaises(AssertionError, check_headers, [('X-Price', '100€')])
00147
00148 @unittest.skipIf(not PY3, 'Useless in Python2')
00149 def test_header_unicode_name(self):
00150 self.assertRaises(AssertionError, check_headers, [('X-€', 'foo')])
00151
00152
00153 class TestCheckEnviron(unittest.TestCase):
00154 def test_no_query_string(self):
00155 environ = {
00156 'REQUEST_METHOD': str('GET'),
00157 'SERVER_NAME': str('localhost'),
00158 'SERVER_PORT': str('80'),
00159 'wsgi.version': (1, 0, 1),
00160 'wsgi.input': StringIO('test'),
00161 'wsgi.errors': StringIO(),
00162 'wsgi.multithread': None,
00163 'wsgi.multiprocess': None,
00164 'wsgi.run_once': None,
00165 'wsgi.url_scheme': 'http',
00166 'PATH_INFO': str('/'),
00167 }
00168 with warnings.catch_warnings(record=True) as w:
00169 warnings.simplefilter("always")
00170 check_environ(environ)
00171 self.assertEqual(len(w), 1, "We should have only one warning")
00172 self.assertTrue(
00173 "QUERY_STRING" in str(w[-1].message),
00174 "The warning message should say something about QUERY_STRING")
00175
00176 def test_no_valid_request(self):
00177 environ = {
00178 'REQUEST_METHOD': str('PROPFIND'),
00179 'SERVER_NAME': str('localhost'),
00180 'SERVER_PORT': str('80'),
00181 'wsgi.version': (1, 0, 1),
00182 'wsgi.input': StringIO('test'),
00183 'wsgi.errors': StringIO(),
00184 'wsgi.multithread': None,
00185 'wsgi.multiprocess': None,
00186 'wsgi.run_once': None,
00187 'wsgi.url_scheme': 'http',
00188 'PATH_INFO': str('/'),
00189 'QUERY_STRING': str(''),
00190 }
00191 with warnings.catch_warnings(record=True) as w:
00192 warnings.simplefilter("always")
00193 check_environ(environ)
00194 self.assertEqual(len(w), 1, "We should have only one warning")
00195 self.assertTrue(
00196 "REQUEST_METHOD" in str(w[-1].message),
00197 "The warning message should say something "
00198 "about REQUEST_METHOD")
00199
00200 def test_handles_native_strings_in_variables(self):
00201
00202 path = '/umläut'
00203 if not PY3:
00204 path = path.encode('utf-8')
00205 environ = {
00206 'REQUEST_METHOD': str('GET'),
00207 'SERVER_NAME': str('localhost'),
00208 'SERVER_PORT': str('80'),
00209 'wsgi.version': (1, 0, 1),
00210 'wsgi.input': StringIO('test'),
00211 'wsgi.errors': StringIO(),
00212 'wsgi.multithread': None,
00213 'wsgi.multiprocess': None,
00214 'wsgi.run_once': None,
00215 'wsgi.url_scheme': 'http',
00216 'PATH_INFO': path,
00217 'QUERY_STRING': str(''),
00218 }
00219 with warnings.catch_warnings(record=True) as w:
00220 warnings.simplefilter("always")
00221 check_environ(environ)
00222 self.assertEqual(0, len(w), "We should have no warning")
00223
00224
00225 class TestIteratorWrapper(unittest.TestCase):
00226 def test_close(self):
00227 class MockIterator(object):
00228
00229 def __init__(self):
00230 self.closed = False
00231
00232 def __iter__(self):
00233 return self
00234
00235 def __next__(self):
00236 return None
00237
00238 next = __next__
00239
00240 def close(self):
00241 self.closed = True
00242
00243 mock = MockIterator()
00244 wrapper = IteratorWrapper(mock, None)
00245 wrapper.close()
00246
00247 self.assertTrue(mock.closed, "Original iterator has not been closed")
00248
00249
00250 class TestWriteWrapper(unittest.TestCase):
00251 def test_wrong_type(self):
00252 write_wrapper = WriteWrapper(None)
00253 self.assertRaises(AssertionError, write_wrapper, 'not a binary')
00254
00255 def test_normal(self):
00256 class MockWriter(object):
00257 def __init__(self):
00258 self.written = []
00259
00260 def __call__(self, s):
00261 self.written.append(s)
00262
00263 data = to_bytes('foo')
00264 mock = MockWriter()
00265 write_wrapper = WriteWrapper(mock)
00266 write_wrapper(data)
00267 self.assertEqual(
00268 mock.written, [data],
00269 "WriterWrapper should call original writer when data is binary "
00270 "type")
00271
00272
00273 class TestErrorWrapper(unittest.TestCase):
00274
00275 def test_dont_close(self):
00276 error_wrapper = ErrorWrapper(None)
00277 self.assertRaises(AssertionError, error_wrapper.close)
00278
00279 class FakeError(object):
00280 def __init__(self):
00281 self.written = []
00282 self.flushed = False
00283
00284 def write(self, s):
00285 self.written.append(s)
00286
00287 def writelines(self, lines):
00288 for line in lines:
00289 self.write(line)
00290
00291 def flush(self):
00292 self.flushed = True
00293
00294 def test_writelines(self):
00295 fake_error = self.FakeError()
00296 error_wrapper = ErrorWrapper(fake_error)
00297 data = [to_bytes('a line'), to_bytes('another line')]
00298 error_wrapper.writelines(data)
00299 self.assertEqual(fake_error.written, data,
00300 "ErrorWrapper should call original writer")
00301
00302 def test_flush(self):
00303 fake_error = self.FakeError()
00304 error_wrapper = ErrorWrapper(fake_error)
00305 error_wrapper.flush()
00306 self.assertTrue(
00307 fake_error.flushed,
00308 "ErrorWrapper should have called original wsgi_errors's flush")