lint.py
Go to the documentation of this file.
00001 # (c) 2005 Ian Bicking and contributors; written for Paste
00002 # (http://pythonpaste.org)
00003 # Licensed under the MIT license:
00004 # http://www.opensource.org/licenses/mit-license.php Also licenced under the
00005 # Apache License, 2.0: http://opensource.org/licenses/apache2.0.php Licensed to
00006 # PSF under a Contributor Agreement
00007 
00008 """
00009 Middleware to check for obedience to the WSGI specification.
00010 
00011 Some of the things this checks:
00012 
00013 * Signature of the application and start_response (including that
00014   keyword arguments are not used).
00015 
00016 * Environment checks:
00017 
00018   - Environment is a dictionary (and not a subclass).
00019 
00020   - That all the required keys are in the environment: REQUEST_METHOD,
00021     SERVER_NAME, SERVER_PORT, wsgi.version, wsgi.input, wsgi.errors,
00022     wsgi.multithread, wsgi.multiprocess, wsgi.run_once
00023 
00024   - That HTTP_CONTENT_TYPE and HTTP_CONTENT_LENGTH are not in the
00025     environment (these headers should appear as CONTENT_LENGTH and
00026     CONTENT_TYPE).
00027 
00028   - Warns if QUERY_STRING is missing, as the cgi module acts
00029     unpredictably in that case.
00030 
00031   - That CGI-style variables (that don't contain a .) have
00032     (non-unicode) string values
00033 
00034   - That wsgi.version is a tuple
00035 
00036   - That wsgi.url_scheme is 'http' or 'https' (@@: is this too
00037     restrictive?)
00038 
00039   - Warns if the REQUEST_METHOD is not known (@@: probably too
00040     restrictive).
00041 
00042   - That SCRIPT_NAME and PATH_INFO are empty or start with /
00043 
00044   - That at least one of SCRIPT_NAME or PATH_INFO are set.
00045 
00046   - That CONTENT_LENGTH is a positive integer.
00047 
00048   - That SCRIPT_NAME is not '/' (it should be '', and PATH_INFO should
00049     be '/').
00050 
00051   - That wsgi.input has the methods read, readline, readlines, and
00052     __iter__
00053 
00054   - That wsgi.errors has the methods flush, write, writelines
00055 
00056 * The status is a string, contains a space, starts with an integer,
00057   and that integer is in range (> 100).
00058 
00059 * That the headers is a list (not a subclass, not another kind of
00060   sequence).
00061 
00062 * That the items of the headers are tuples of strings.
00063 
00064 * That there is no 'status' header (that is used in CGI, but not in
00065   WSGI).
00066 
00067 * That the headers don't contain newlines or colons, end in _ or -, or
00068   contain characters codes below 037.
00069 
00070 * That Content-Type is given if there is content (CGI often has a
00071   default content type, but WSGI does not).
00072 
00073 * That no Content-Type is given when there is no content (@@: is this
00074   too restrictive?)
00075 
00076 * That the exc_info argument to start_response is a tuple or None.
00077 
00078 * That all calls to the writer are with strings, and no other methods
00079   on the writer are accessed.
00080 
00081 * That wsgi.input is used properly:
00082 
00083   - .read() is called with zero or one argument
00084 
00085   - That it returns a string
00086 
00087   - That readline, readlines, and __iter__ return strings
00088 
00089   - That .close() is not called
00090 
00091   - No other methods are provided
00092 
00093 * That wsgi.errors is used properly:
00094 
00095   - .write() and .writelines() is called with a string, except
00096     with python3
00097 
00098   - That .close() is not called, and no other methods are provided.
00099 
00100 * The response iterator:
00101 
00102   - That it is not a string (it should be a list of a single string; a
00103     string will work, but perform horribly).
00104 
00105   - That .next() returns a string
00106 
00107   - That the iterator is not iterated over until start_response has
00108     been called (that can signal either a server or application
00109     error).
00110 
00111   - That .close() is called (doesn't raise exception, only prints to
00112     sys.stderr, because we only know it isn't called when the object
00113     is garbage collected).
00114 
00115 """
00116 from __future__ import unicode_literals
00117 
00118 import collections
00119 import re
00120 import warnings
00121 from six import PY3
00122 from six import binary_type
00123 from six import string_types
00124 
00125 header_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9\-_]*$')
00126 bad_header_value_re = re.compile(r'[\000-\037]')
00127 
00128 valid_methods = (
00129     'GET', 'HEAD', 'POST', 'OPTIONS', 'PUT', 'DELETE',
00130     'TRACE', 'PATCH',
00131 )
00132 
00133 METADATA_TYPE = PY3 and (str, binary_type) or (str,)
00134 
00135 # PEP-3333 says that environment variables must be "native strings",
00136 # i.e. str(), which however is something *different* in py2 and py3.
00137 SLASH = str('/')
00138 
00139 
00140 def to_string(value):
00141     if not isinstance(value, string_types):
00142         return value.decode('latin1')
00143     else:
00144         return value
00145 
00146 
00147 class WSGIWarning(Warning):
00148     """
00149     Raised in response to WSGI-spec-related warnings
00150     """
00151 
00152 
00153 def middleware(application, global_conf=None):
00154 
00155     """
00156     When applied between a WSGI server and a WSGI application, this
00157     middleware will check for WSGI compliancy on a number of levels.
00158     This middleware does not modify the request or response in any
00159     way, but will throw an AssertionError if anything seems off
00160     (except for a failure to close the application iterator, which
00161     will be printed to stderr -- there's no way to throw an exception
00162     at that point).
00163     """
00164 
00165     def lint_app(*args, **kw):
00166         assert len(args) == 2, "Two arguments required"
00167         assert not kw, "No keyword arguments allowed"
00168         environ, start_response = args
00169 
00170         check_environ(environ)
00171 
00172         # We use this to check if the application returns without
00173         # calling start_response:
00174         start_response_started = []
00175 
00176         def start_response_wrapper(*args, **kw):
00177             assert len(args) == 2 or len(args) == 3, (
00178                 "Invalid number of arguments: %s" % args)
00179             assert not kw, "No keyword arguments allowed"
00180             status = args[0]
00181             headers = args[1]
00182             if len(args) == 3:
00183                 exc_info = args[2]
00184             else:
00185                 exc_info = None
00186 
00187             check_status(status)
00188             check_headers(headers)
00189             check_content_type(status, headers)
00190             check_exc_info(exc_info)
00191 
00192             start_response_started.append(None)
00193             return WriteWrapper(start_response(*args))
00194 
00195         environ['wsgi.input'] = InputWrapper(environ['wsgi.input'])
00196         environ['wsgi.errors'] = ErrorWrapper(environ['wsgi.errors'])
00197 
00198         iterator = application(environ, start_response_wrapper)
00199         assert isinstance(iterator, collections.Iterable), (
00200             "The application must return an iterator, if only an empty list")
00201 
00202         check_iterator(iterator)
00203 
00204         return IteratorWrapper(iterator, start_response_started)
00205 
00206     return lint_app
00207 
00208 
00209 class InputWrapper(object):
00210 
00211     def __init__(self, wsgi_input):
00212         self.input = wsgi_input
00213 
00214     def read(self, *args):
00215         assert len(args) <= 1
00216         v = self.input.read(*args)
00217         assert type(v) is binary_type
00218         return v
00219 
00220     def readline(self, *args):
00221         v = self.input.readline(*args)
00222         assert type(v) is binary_type
00223         return v
00224 
00225     def readlines(self, *args):
00226         assert len(args) <= 1
00227         lines = self.input.readlines(*args)
00228         assert isinstance(lines, list)
00229         for line in lines:
00230             assert type(line) is binary_type
00231         return lines
00232 
00233     def __iter__(self):
00234         while 1:
00235             line = self.readline()
00236             if not line:
00237                 return
00238             yield line
00239 
00240     def close(self):
00241         assert 0, "input.close() must not be called"
00242 
00243     def seek(self, *a, **kw):
00244         return self.input.seek(*a, **kw)
00245 
00246 
00247 class ErrorWrapper(object):
00248 
00249     def __init__(self, wsgi_errors):
00250         self.errors = wsgi_errors
00251 
00252     def write(self, s):
00253         if not PY3:
00254             assert type(s) is binary_type
00255         self.errors.write(s)
00256 
00257     def flush(self):
00258         self.errors.flush()
00259 
00260     def writelines(self, seq):
00261         for line in seq:
00262             self.write(line)
00263 
00264     def close(self):
00265         assert 0, "errors.close() must not be called"
00266 
00267 
00268 class WriteWrapper(object):
00269 
00270     def __init__(self, wsgi_writer):
00271         self.writer = wsgi_writer
00272 
00273     def __call__(self, s):
00274         assert type(s) is binary_type
00275         self.writer(s)
00276 
00277 
00278 class IteratorWrapper(object):
00279 
00280     def __init__(self, wsgi_iterator, check_start_response):
00281         self.original_iterator = wsgi_iterator
00282         self.iterator = iter(wsgi_iterator)
00283         self.closed = False
00284         self.check_start_response = check_start_response
00285 
00286     def __iter__(self):
00287         return self
00288 
00289     def next(self):
00290         assert not self.closed, (
00291             "Iterator read after closed")
00292         v = next(self.iterator)
00293         if self.check_start_response is not None:
00294             assert self.check_start_response, (
00295                 "The application returns and we started iterating over its"
00296                 " body, but start_response has not yet been called")
00297             self.check_start_response = None
00298         assert isinstance(v, binary_type), (
00299             "Iterator %r returned a non-%r object: %r"
00300             % (self.iterator, binary_type, v))
00301         return v
00302 
00303     __next__ = next
00304 
00305     def close(self):
00306         self.closed = True
00307         if hasattr(self.original_iterator, 'close'):
00308             self.original_iterator.close()
00309 
00310     def __del__(self):
00311         assert self.closed, (
00312             "Iterator garbage collected without being closed")
00313 
00314 
00315 def check_environ(environ):
00316     assert type(environ) is dict, (
00317         "Environment is not of the right type: %r (environment: %r)"
00318         % (type(environ), environ))
00319 
00320     for key in ['REQUEST_METHOD', 'SERVER_NAME', 'SERVER_PORT',
00321                 'wsgi.version', 'wsgi.input', 'wsgi.errors',
00322                 'wsgi.multithread', 'wsgi.multiprocess',
00323                 'wsgi.run_once']:
00324         assert key in environ, (
00325             "Environment missing required key: %r" % key)
00326 
00327     for key in ['HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH']:
00328         assert key not in environ, (
00329             "Environment should not have the key: %s "
00330             "(use %s instead)" % (key, key[5:]))
00331 
00332     if 'QUERY_STRING' not in environ:
00333         warnings.warn(
00334             'QUERY_STRING is not in the WSGI environment; the cgi '
00335             'module will use sys.argv when this variable is missing, '
00336             'so application errors are more likely',
00337             WSGIWarning)
00338 
00339     for key in environ:
00340         if '.' in key:
00341             # Extension, we don't care about its type
00342             continue
00343         assert type(environ[key]) in METADATA_TYPE, (
00344             "Environmental variable %s is not a string: %r (value: %r)"
00345             % (key, type(environ[key]), environ[key]))
00346 
00347     assert type(environ['wsgi.version']) is tuple, (
00348         "wsgi.version should be a tuple (%r)" % environ['wsgi.version'])
00349     assert environ['wsgi.url_scheme'] in ('http', 'https'), (
00350         "wsgi.url_scheme unknown: %r" % environ['wsgi.url_scheme'])
00351 
00352     check_input(environ['wsgi.input'])
00353     check_errors(environ['wsgi.errors'])
00354 
00355     # @@: these need filling out:
00356     if environ['REQUEST_METHOD'] not in valid_methods:
00357         warnings.warn(
00358             "Unknown REQUEST_METHOD: %r" % environ['REQUEST_METHOD'],
00359             WSGIWarning)
00360 
00361     assert (not environ.get('SCRIPT_NAME')
00362             or environ['SCRIPT_NAME'].startswith(SLASH)), (
00363         "SCRIPT_NAME doesn't start with /: %r" % environ['SCRIPT_NAME'])
00364     assert (not environ.get('PATH_INFO')
00365             or environ['PATH_INFO'].startswith(SLASH)), (
00366         "PATH_INFO doesn't start with /: %r" % environ['PATH_INFO'])
00367     if environ.get('CONTENT_LENGTH'):
00368         assert int(environ['CONTENT_LENGTH']) >= 0, (
00369             "Invalid CONTENT_LENGTH: %r" % environ['CONTENT_LENGTH'])
00370 
00371     if not environ.get('SCRIPT_NAME'):
00372         assert 'PATH_INFO' in environ, (
00373             "One of SCRIPT_NAME or PATH_INFO are required (PATH_INFO "
00374             "should at least be '/' if SCRIPT_NAME is empty)")
00375     assert environ.get('SCRIPT_NAME') != SLASH, (
00376         "SCRIPT_NAME cannot be '/'; it should instead be '', and "
00377         "PATH_INFO should be '/'")
00378 
00379 
00380 def check_input(wsgi_input):
00381     for attr in ['read', 'readline', 'readlines', '__iter__']:
00382         assert hasattr(wsgi_input, attr), (
00383             "wsgi.input (%r) doesn't have the attribute %s"
00384             % (wsgi_input, attr))
00385 
00386 
00387 def check_errors(wsgi_errors):
00388     for attr in ['flush', 'write', 'writelines']:
00389         assert hasattr(wsgi_errors, attr), (
00390             "wsgi.errors (%r) doesn't have the attribute %s"
00391             % (wsgi_errors, attr))
00392 
00393 
00394 def check_status(status):
00395     assert type(status) in METADATA_TYPE, (
00396         "Status must be a %s (not %r)" % (METADATA_TYPE, status))
00397     status = to_string(status)
00398     assert len(status) > 5, (
00399         "The status string (%r) should be a three-digit "
00400         "integer followed by a single space and a status explanation"
00401         ) % status
00402     assert status[:3].isdigit(), (
00403         "The status string (%r) should start with"
00404         "three digits") % status
00405 
00406     status_int = int(status[:3])
00407     assert status_int >= 100, (
00408         "The status code must be greater or equal than "
00409         "100 (got %d)") % status_int
00410     assert status[3] == ' ', (
00411         "The status string (%r) should start with three"
00412         "digits and a space (4th characters is not a space here)") % status
00413 
00414 
00415 def _assert_latin1_py3(string, message):
00416     if PY3 and type(string) is str:
00417         try:
00418             string.encode('latin1')
00419         except UnicodeEncodeError:
00420             raise AssertionError(message)
00421 
00422 
00423 def check_headers(headers):
00424     assert type(headers) is list, (
00425         "Headers (%r) must be of type list: %r"
00426         % (headers, type(headers)))
00427     for item in headers:
00428         assert type(item) is tuple, (
00429             "Individual headers (%r) must be of type tuple: %r"
00430             % (item, type(item)))
00431         assert len(item) == 2
00432         name, value = item
00433         _assert_latin1_py3(
00434             name,
00435             "Headers values must be latin1 string or bytes."
00436             "%r is not a valid latin1 string" % (value,)
00437         )
00438         str_name = to_string(name)
00439         assert str_name.lower() != 'status', (
00440             "The Status header cannot be used; it conflicts with CGI "
00441             "script, and HTTP status is not given through headers "
00442             "(value: %r)." % value)
00443         assert '\n' not in str_name and ':' not in str_name, (
00444             "Header names may not contain ':' or '\\n': %r" % name)
00445         assert header_re.search(str_name), "Bad header name: %r" % name
00446         assert not str_name.endswith('-') and not str_name.endswith('_'), (
00447             "Names may not end in '-' or '_': %r" % name)
00448         _assert_latin1_py3(
00449             value,
00450             "Headers values must be latin1 string or bytes."
00451             "%r is not a valid latin1 string" % (value,)
00452         )
00453         str_value = to_string(value)
00454         assert not bad_header_value_re.search(str_value), (
00455             "Bad header value: %r (bad char: %r)"
00456             % (str_value, bad_header_value_re.search(str_value).group(0)))
00457 
00458 
00459 def check_content_type(status, headers):
00460     code = int(status.split(None, 1)[0])
00461     # @@: need one more person to verify this interpretation of RFC 2616
00462     #     http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
00463     NO_MESSAGE_BODY = (201, 204, 304)
00464     NO_MESSAGE_TYPE = (204, 304)
00465     length = None
00466     for name, value in headers:
00467         str_name = to_string(name)
00468         if str_name.lower() == 'content-length' and value.isdigit():
00469             length = int(value)
00470             break
00471     for name, value in headers:
00472         str_name = to_string(name)
00473         if str_name.lower() == 'content-type':
00474             if code not in NO_MESSAGE_TYPE:
00475                 return
00476             elif length == 0:
00477                 warnings.warn(("Content-Type header found in a %s response, "
00478                                "which not return content.") % code,
00479                               WSGIWarning)
00480                 return
00481             else:
00482                 assert 0, (("Content-Type header found in a %s response, "
00483                             "which must not return content.") % code)
00484     if code not in NO_MESSAGE_BODY and length is not None and length > 0:
00485         assert 0, "No Content-Type header found in headers (%s)" % headers
00486 
00487 
00488 def check_exc_info(exc_info):
00489     assert exc_info is None or type(exc_info) is tuple, (
00490         "exc_info (%r) is not a tuple: %r" % (exc_info, type(exc_info)))
00491     # More exc_info checks?
00492 
00493 
00494 def check_iterator(iterator):
00495     valid_type = PY3 and bytes or str
00496     # Technically a bytes (str for py2.x) is legal, which is why it's a
00497     # really bad idea, because it may cause the response to be returned
00498     # character-by-character
00499     assert not isinstance(iterator, valid_type), (
00500         "You should not return a bytes as your application iterator, "
00501         "instead return a single-item list containing that string.")
00502 
00503 __all__ = ['middleware']


webtest
Author(s): AlexV
autogenerated on Sat Jun 8 2019 20:32:07