app.py
Go to the documentation of this file.
00001 # (c) 2005 Ian Bicking and contributors; written for Paste
00002 # (http://pythonpaste.org)
00003 # Licensed under the MIT license:
00004 # http://www.opensource.org/licenses/mit-license.php
00005 """
00006 Routines for testing WSGI applications.
00007 
00008 Most interesting is TestApp
00009 """
00010 from __future__ import unicode_literals
00011 
00012 import os
00013 import re
00014 import json
00015 import random
00016 import fnmatch
00017 import mimetypes
00018 
00019 from base64 import b64encode
00020 
00021 from six import StringIO
00022 from six import BytesIO
00023 from six import string_types
00024 from six import binary_type
00025 from six import text_type
00026 from six.moves import http_cookiejar
00027 
00028 from webtest.compat import urlparse
00029 from webtest.compat import urlencode
00030 from webtest.compat import to_bytes
00031 from webtest.compat import escape_cookie_value
00032 from webtest.response import TestResponse
00033 from webtest import forms
00034 from webtest import lint
00035 from webtest import utils
00036 
00037 import webob
00038 
00039 
00040 __all__ = ['TestApp', 'TestRequest']
00041 
00042 
00043 class AppError(Exception):
00044 
00045     def __init__(self, message, *args):
00046         if isinstance(message, binary_type):
00047             message = message.decode('utf8')
00048         str_args = ()
00049         for arg in args:
00050             if isinstance(arg, webob.Response):
00051                 body = arg.body
00052                 if isinstance(body, binary_type):
00053                     if arg.charset:
00054                         arg = body.decode(arg.charset)
00055                     else:
00056                         arg = repr(body)
00057             elif isinstance(arg, binary_type):
00058                 try:
00059                     arg = arg.decode('utf8')
00060                 except UnicodeDecodeError:
00061                     arg = repr(arg)
00062             str_args += (arg,)
00063         message = message % str_args
00064         Exception.__init__(self, message)
00065 
00066 
00067 class CookiePolicy(http_cookiejar.DefaultCookiePolicy):
00068     """A subclass of DefaultCookiePolicy to allow cookie set for
00069     Domain=localhost."""
00070 
00071     def return_ok_domain(self, cookie, request):
00072         if cookie.domain == '.localhost':
00073             return True
00074         return http_cookiejar.DefaultCookiePolicy.return_ok_domain(
00075             self, cookie, request)
00076 
00077     def set_ok_domain(self, cookie, request):
00078         if cookie.domain == '.localhost':
00079             return True
00080         return http_cookiejar.DefaultCookiePolicy.set_ok_domain(
00081             self, cookie, request)
00082 
00083 
00084 class TestRequest(webob.BaseRequest):
00085     """A subclass of webob.Request"""
00086     ResponseClass = TestResponse
00087 
00088 
00089 class TestApp(object):
00090     """
00091     Wraps a WSGI application in a more convenient interface for
00092     testing. It uses extended version of :class:`webob.BaseRequest`
00093     and :class:`webob.Response`.
00094 
00095     :param app:
00096         May be an WSGI application or Paste Deploy app,
00097         like ``'config:filename.ini#test'``.
00098 
00099         .. versionadded:: 2.0
00100 
00101         It can also be an actual full URL to an http server and webtest
00102         will proxy requests with `wsgiproxy`.
00103     :type app:
00104         WSGI application
00105     :param extra_environ:
00106         A dictionary of values that should go
00107         into the environment for each request. These can provide a
00108         communication channel with the application.
00109     :type extra_environ:
00110         dict
00111     :param relative_to:
00112         A directory used for file
00113         uploads are calculated relative to this.  Also ``config:``
00114         URIs that aren't absolute.
00115     :type relative_to:
00116         string
00117     :param cookiejar:
00118         :class:`cookielib.CookieJar` alike API that keeps cookies
00119         across requets.
00120     :type cookiejar:
00121         CookieJar instance
00122 
00123     .. attribute:: cookies
00124 
00125         A convenient shortcut for a dict of all cookies in
00126         ``cookiejar``.
00127 
00128     :param parser_features:
00129         Passed to BeautifulSoup when parsing responses.
00130     :type parser_features:
00131         string or list
00132     :param json_encoder:
00133         Passed to json.dumps when encoding json
00134     :type json_encoder:
00135         A subclass of json.JSONEncoder
00136     :param lint:
00137         If True (default) then check that the application is WSGI compliant
00138     :type lint:
00139         A boolean
00140     """
00141 
00142     RequestClass = TestRequest
00143 
00144     def __init__(self, app, extra_environ=None, relative_to=None,
00145                  use_unicode=True, cookiejar=None, parser_features=None,
00146                  json_encoder=None, lint=True):
00147 
00148         if 'WEBTEST_TARGET_URL' in os.environ:
00149             app = os.environ['WEBTEST_TARGET_URL']
00150         if isinstance(app, string_types):
00151             if app.startswith('http'):
00152                 try:
00153                     from wsgiproxy import HostProxy
00154                 except ImportError:  # pragma: no cover
00155                     raise ImportError((
00156                         'Using webtest with a real url requires WSGIProxy2. '
00157                         'Please install it with: '
00158                         'pip install WSGIProxy2'))
00159                 if '#' not in app:
00160                     app += '#httplib'
00161                 url, client = app.split('#', 1)
00162                 app = HostProxy(url, client=client)
00163             else:
00164                 from paste.deploy import loadapp
00165                 # @@: Should pick up relative_to from calling module's
00166                 # __file__
00167                 app = loadapp(app, relative_to=relative_to)
00168         self.app = app
00169         self.lint = lint
00170         self.relative_to = relative_to
00171         if extra_environ is None:
00172             extra_environ = {}
00173         self.extra_environ = extra_environ
00174         self.use_unicode = use_unicode
00175         if cookiejar is None:
00176             cookiejar = http_cookiejar.CookieJar(policy=CookiePolicy())
00177         self.cookiejar = cookiejar
00178         if parser_features is None:
00179             parser_features = 'html.parser'
00180         self.RequestClass.ResponseClass.parser_features = parser_features
00181         if json_encoder is None:
00182             json_encoder = json.JSONEncoder
00183         self.JSONEncoder = json_encoder
00184 
00185     def get_authorization(self):
00186         """Allow to set the HTTP_AUTHORIZATION environ key. Value should looks
00187         like ``('Basic', ('user', 'password'))``
00188 
00189         If value is None the the HTTP_AUTHORIZATION is removed
00190         """
00191         return self.authorization_value
00192 
00193     def set_authorization(self, value):
00194         self.authorization_value = value
00195         if value is not None:
00196             invalid_value = (
00197                 "You should use a value like ('Basic', ('user', 'password'))"
00198             )
00199             if isinstance(value, (list, tuple)) and len(value) == 2:
00200                 authtype, val = value
00201                 if authtype == 'Basic' and val and \
00202                    isinstance(val, (list, tuple)):
00203                     val = ':'.join(list(val))
00204                     val = b64encode(to_bytes(val)).strip()
00205                     val = val.decode('latin1')
00206                 else:
00207                     raise ValueError(invalid_value)
00208                 value = str('%s %s' % (authtype, val))
00209             else:
00210                 raise ValueError(invalid_value)
00211             self.extra_environ.update({
00212                 'HTTP_AUTHORIZATION': value,
00213             })
00214         else:
00215             if 'HTTP_AUTHORIZATION' in self.extra_environ:
00216                 del self.extra_environ['HTTP_AUTHORIZATION']
00217 
00218     authorization = property(get_authorization, set_authorization)
00219 
00220     @property
00221     def cookies(self):
00222         return dict([(cookie.name, cookie.value) for cookie in self.cookiejar])
00223 
00224     def set_cookie(self, name, value):
00225         """
00226         Sets a cookie to be passed through with requests.
00227 
00228         """
00229         value = escape_cookie_value(value)
00230         cookie = http_cookiejar.Cookie(
00231             version=0,
00232             name=name,
00233             value=value,
00234             port=None,
00235             port_specified=False,
00236             domain='.localhost',
00237             domain_specified=True,
00238             domain_initial_dot=False,
00239             path='/',
00240             path_specified=True,
00241             secure=False,
00242             expires=None,
00243             discard=False,
00244             comment=None,
00245             comment_url=None,
00246             rest=None
00247         )
00248         self.cookiejar.set_cookie(cookie)
00249 
00250     def reset(self):
00251         """
00252         Resets the state of the application; currently just clears
00253         saved cookies.
00254         """
00255         self.cookiejar.clear()
00256 
00257     def set_parser_features(self, parser_features):
00258         """
00259         Changes the parser used by BeautifulSoup. See its documentation to
00260         know the supported parsers.
00261         """
00262         self.RequestClass.ResponseClass.parser_features = parser_features
00263 
00264     def get(self, url, params=None, headers=None, extra_environ=None,
00265             status=None, expect_errors=False, xhr=False):
00266         """
00267         Do a GET request given the url path.
00268 
00269         :param params:
00270             A query string, or a dictionary that will be encoded
00271             into a query string.  You may also include a URL query
00272             string on the ``url``.
00273         :param headers:
00274             Extra headers to send.
00275         :type headers:
00276             dictionary
00277         :param extra_environ:
00278             Environmental variables that should be added to the request.
00279         :type extra_environ:
00280             dictionary
00281         :param status:
00282             The HTTP status code you expect in response (if not 200 or 3xx).
00283             You can also use a wildcard, like ``'3*'`` or ``'*'``.
00284         :type status:
00285             integer or string
00286         :param expect_errors:
00287             If this is False, then if anything is written to
00288             environ ``wsgi.errors`` it will be an error.
00289             If it is True, then non-200/3xx responses are also okay.
00290         :type expect_errors:
00291             boolean
00292         :param xhr:
00293             If this is true, then marks response as ajax. The same as
00294             headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }
00295         :type xhr:
00296             boolean
00297 
00298         :returns: :class:`webtest.TestResponse` instance.
00299 
00300         """
00301         environ = self._make_environ(extra_environ)
00302         url = str(url)
00303         url = self._remove_fragment(url)
00304         if params:
00305             if not isinstance(params, string_types):
00306                 params = urlencode(params, doseq=True)
00307             if str('?') in url:
00308                 url += str('&')
00309             else:
00310                 url += str('?')
00311             url += params
00312         if str('?') in url:
00313             url, environ['QUERY_STRING'] = url.split(str('?'), 1)
00314         else:
00315             environ['QUERY_STRING'] = str('')
00316         req = self.RequestClass.blank(url, environ)
00317         if xhr:
00318             headers = self._add_xhr_header(headers)
00319         if headers:
00320             req.headers.update(headers)
00321         return self.do_request(req, status=status,
00322                                expect_errors=expect_errors)
00323 
00324     def post(self, url, params='', headers=None, extra_environ=None,
00325              status=None, upload_files=None, expect_errors=False,
00326              content_type=None, xhr=False):
00327         """
00328         Do a POST request. Similar to :meth:`~webtest.TestApp.get`.
00329 
00330         :param params:
00331             Are put in the body of the request. If params is a
00332             iterator it will be urlencoded, if it is string it will not
00333             be encoded, but placed in the body directly.
00334 
00335             Can be a collections.OrderedDict with
00336             :class:`webtest.forms.Upload` fields included::
00337 
00338 
00339             app.post('/myurl', collections.OrderedDict([
00340                 ('textfield1', 'value1'),
00341                 ('uploadfield', webapp.Upload('filename.txt', 'contents'),
00342                 ('textfield2', 'value2')])))
00343 
00344         :param upload_files:
00345             It should be a list of ``(fieldname, filename, file_content)``.
00346             You can also use just ``(fieldname, filename)`` and the file
00347             contents will be read from disk.
00348         :type upload_files:
00349             list
00350         :param content_type:
00351             HTTP content type, for example `application/json`.
00352         :type content_type:
00353             string
00354 
00355         :param xhr:
00356             If this is true, then marks response as ajax. The same as
00357             headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }
00358         :type xhr:
00359             boolean
00360 
00361         :returns: :class:`webtest.TestResponse` instance.
00362 
00363         """
00364         if xhr:
00365             headers = self._add_xhr_header(headers)
00366         return self._gen_request('POST', url, params=params, headers=headers,
00367                                  extra_environ=extra_environ, status=status,
00368                                  upload_files=upload_files,
00369                                  expect_errors=expect_errors,
00370                                  content_type=content_type)
00371 
00372     def put(self, url, params='', headers=None, extra_environ=None,
00373             status=None, upload_files=None, expect_errors=False,
00374             content_type=None, xhr=False):
00375         """
00376         Do a PUT request. Similar to :meth:`~webtest.TestApp.post`.
00377 
00378         :returns: :class:`webtest.TestResponse` instance.
00379 
00380         """
00381         if xhr:
00382             headers = self._add_xhr_header(headers)
00383         return self._gen_request('PUT', url, params=params, headers=headers,
00384                                  extra_environ=extra_environ, status=status,
00385                                  upload_files=upload_files,
00386                                  expect_errors=expect_errors,
00387                                  content_type=content_type,
00388                                  )
00389 
00390     def patch(self, url, params='', headers=None, extra_environ=None,
00391               status=None, upload_files=None, expect_errors=False,
00392               content_type=None, xhr=False):
00393         """
00394         Do a PATCH request. Similar to :meth:`~webtest.TestApp.post`.
00395 
00396         :returns: :class:`webtest.TestResponse` instance.
00397 
00398         """
00399         if xhr:
00400             headers = self._add_xhr_header(headers)
00401         return self._gen_request('PATCH', url, params=params, headers=headers,
00402                                  extra_environ=extra_environ, status=status,
00403                                  upload_files=upload_files,
00404                                  expect_errors=expect_errors,
00405                                  content_type=content_type)
00406 
00407     def delete(self, url, params='', headers=None,
00408                extra_environ=None, status=None, expect_errors=False,
00409                content_type=None, xhr=False):
00410         """
00411         Do a DELETE request. Similar to :meth:`~webtest.TestApp.get`.
00412 
00413         :returns: :class:`webtest.TestResponse` instance.
00414 
00415         """
00416         if xhr:
00417             headers = self._add_xhr_header(headers)
00418         return self._gen_request('DELETE', url, params=params, headers=headers,
00419                                  extra_environ=extra_environ, status=status,
00420                                  upload_files=None,
00421                                  expect_errors=expect_errors,
00422                                  content_type=content_type)
00423 
00424     def options(self, url, headers=None, extra_environ=None,
00425                 status=None, expect_errors=False, xhr=False):
00426         """
00427         Do a OPTIONS request. Similar to :meth:`~webtest.TestApp.get`.
00428 
00429         :returns: :class:`webtest.TestResponse` instance.
00430 
00431         """
00432         if xhr:
00433             headers = self._add_xhr_header(headers)
00434         return self._gen_request('OPTIONS', url, headers=headers,
00435                                  extra_environ=extra_environ, status=status,
00436                                  upload_files=None,
00437                                  expect_errors=expect_errors)
00438 
00439     def head(self, url, headers=None, extra_environ=None,
00440              status=None, expect_errors=False, xhr=False):
00441         """
00442         Do a HEAD request. Similar to :meth:`~webtest.TestApp.get`.
00443 
00444         :returns: :class:`webtest.TestResponse` instance.
00445 
00446         """
00447         if xhr:
00448             headers = self._add_xhr_header(headers)
00449         return self._gen_request('HEAD', url, headers=headers,
00450                                  extra_environ=extra_environ, status=status,
00451                                  upload_files=None,
00452                                  expect_errors=expect_errors)
00453 
00454     post_json = utils.json_method('POST')
00455     put_json = utils.json_method('PUT')
00456     patch_json = utils.json_method('PATCH')
00457     delete_json = utils.json_method('DELETE')
00458 
00459     def encode_multipart(self, params, files):
00460         """
00461         Encodes a set of parameters (typically a name/value list) and
00462         a set of files (a list of (name, filename, file_body, mimetype)) into a
00463         typical POST body, returning the (content_type, body).
00464 
00465         """
00466         boundary = to_bytes(str(random.random()))[2:]
00467         boundary = b'----------a_BoUnDaRy' + boundary + b'$'
00468         lines = []
00469 
00470         def _append_file(file_info):
00471             key, filename, value, fcontent = self._get_file_info(file_info)
00472             if isinstance(key, text_type):
00473                 try:
00474                     key = key.encode('ascii')
00475                 except:  # pragma: no cover
00476                     raise  # file name must be ascii
00477             if isinstance(filename, text_type):
00478                 try:
00479                     filename = filename.encode('utf8')
00480                 except:  # pragma: no cover
00481                     raise  # file name must be ascii or utf8
00482             if not fcontent:
00483                 fcontent = mimetypes.guess_type(filename.decode('utf8'))[0]
00484             fcontent = to_bytes(fcontent)
00485             fcontent = fcontent or b'application/octet-stream'
00486             lines.extend([
00487                 b'--' + boundary,
00488                 b'Content-Disposition: form-data; ' +
00489                 b'name="' + key + b'"; filename="' + filename + b'"',
00490                 b'Content-Type: ' + fcontent, b'', value])
00491 
00492         for key, value in params:
00493             if isinstance(key, text_type):
00494                 try:
00495                     key = key.encode('ascii')
00496                 except:  # pragma: no cover
00497                     raise  # field name are always ascii
00498             if isinstance(value, forms.File):
00499                 if value.value:
00500                     _append_file([key] + list(value.value))
00501             elif isinstance(value, forms.Upload):
00502                 file_info = [key, value.filename]
00503                 if value.content is not None:
00504                     file_info.append(value.content)
00505                     if value.content_type is not None:
00506                         file_info.append(value.content_type)
00507                 _append_file(file_info)
00508             else:
00509                 if isinstance(value, text_type):
00510                     value = value.encode('utf8')
00511                 lines.extend([
00512                     b'--' + boundary,
00513                     b'Content-Disposition: form-data; name="' + key + b'"',
00514                     b'', value])
00515 
00516         for file_info in files:
00517             _append_file(file_info)
00518 
00519         lines.extend([b'--' + boundary + b'--', b''])
00520         body = b'\r\n'.join(lines)
00521         boundary = boundary.decode('ascii')
00522         content_type = 'multipart/form-data; boundary=%s' % boundary
00523         return content_type, body
00524 
00525     def request(self, url_or_req, status=None, expect_errors=False,
00526                 **req_params):
00527         """
00528         Creates and executes a request. You may either pass in an
00529         instantiated :class:`TestRequest` object, or you may pass in a
00530         URL and keyword arguments to be passed to
00531         :meth:`TestRequest.blank`.
00532 
00533         You can use this to run a request without the intermediary
00534         functioning of :meth:`TestApp.get` etc.  For instance, to
00535         test a WebDAV method::
00536 
00537             resp = app.request('/new-col', method='MKCOL')
00538 
00539         Note that the request won't have a body unless you specify it,
00540         like::
00541 
00542             resp = app.request('/test.txt', method='PUT', body='test')
00543 
00544         You can use :class:`webtest.TestRequest`::
00545 
00546             req = webtest.TestRequest.blank('/url/', method='GET')
00547             resp = app.do_request(req)
00548 
00549         """
00550         if isinstance(url_or_req, text_type):
00551             url_or_req = str(url_or_req)
00552         for (k, v) in req_params.items():
00553             if isinstance(v, text_type):
00554                 req_params[k] = str(v)
00555         if isinstance(url_or_req, string_types):
00556             req = self.RequestClass.blank(url_or_req, **req_params)
00557         else:
00558             req = url_or_req.copy()
00559             for name, value in req_params.items():
00560                 setattr(req, name, value)
00561         req.environ['paste.throw_errors'] = True
00562         for name, value in self.extra_environ.items():
00563             req.environ.setdefault(name, value)
00564         return self.do_request(req,
00565                                status=status,
00566                                expect_errors=expect_errors,
00567                                )
00568 
00569     def do_request(self, req, status=None, expect_errors=None):
00570         """
00571         Executes the given webob Request (``req``), with the expected
00572         ``status``.  Generally :meth:`~webtest.TestApp.get` and
00573         :meth:`~webtest.TestApp.post` are used instead.
00574 
00575         To use this::
00576 
00577             req = webtest.TestRequest.blank('url', ...args...)
00578             resp = app.do_request(req)
00579 
00580         .. note::
00581 
00582             You can pass any keyword arguments to
00583             ``TestRequest.blank()``, which will be set on the request.
00584             These can be arguments like ``content_type``, ``accept``, etc.
00585 
00586         """
00587 
00588         errors = StringIO()
00589         req.environ['wsgi.errors'] = errors
00590         script_name = req.environ.get('SCRIPT_NAME', '')
00591         if script_name and req.path_info.startswith(script_name):
00592             req.path_info = req.path_info[len(script_name):]
00593 
00594         # set framework hooks
00595         req.environ['paste.testing'] = True
00596         req.environ['paste.testing_variables'] = {}
00597 
00598         # set request cookies
00599         self.cookiejar.add_cookie_header(utils._RequestCookieAdapter(req))
00600 
00601         # verify wsgi compatibility
00602         app = lint.middleware(self.app) if self.lint else self.app
00603 
00604         ## FIXME: should it be an option to not catch exc_info?
00605         res = req.get_response(app, catch_exc_info=True)
00606 
00607         # be sure to decode the content
00608         res.decode_content()
00609 
00610         # set a few handy attributes
00611         res._use_unicode = self.use_unicode
00612         res.request = req
00613         res.app = app
00614         res.test_app = self
00615 
00616         # We do this to make sure the app_iter is exausted:
00617         try:
00618             res.body
00619         except TypeError:  # pragma: no cover
00620             pass
00621         res.errors = errors.getvalue()
00622 
00623         for name, value in req.environ['paste.testing_variables'].items():
00624             if hasattr(res, name):
00625                 raise ValueError(
00626                     "paste.testing_variables contains the variable %r, but "
00627                     "the response object already has an attribute by that "
00628                     "name" % name)
00629             setattr(res, name, value)
00630         if not expect_errors:
00631             self._check_status(status, res)
00632             self._check_errors(res)
00633 
00634         # merge cookies back in
00635         self.cookiejar.extract_cookies(utils._ResponseCookieAdapter(res),
00636                                        utils._RequestCookieAdapter(req))
00637 
00638         return res
00639 
00640     def _check_status(self, status, res):
00641         if status == '*':
00642             return
00643         res_status = res.status
00644         if (isinstance(status, string_types) and '*' in status):
00645             if re.match(fnmatch.translate(status), res_status, re.I):
00646                 return
00647         if isinstance(status, string_types):
00648             if status == res_status:
00649                 return
00650         if isinstance(status, (list, tuple)):
00651             if res.status_int not in status:
00652                 raise AppError(
00653                     "Bad response: %s (not one of %s for %s)\n%s",
00654                     res_status, ', '.join(map(str, status)),
00655                     res.request.url, res)
00656             return
00657         if status is None:
00658             if res.status_int >= 200 and res.status_int < 400:
00659                 return
00660             raise AppError(
00661                 "Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s",
00662                 res_status, res.request.url,
00663                 res)
00664         if status != res.status_int:
00665             raise AppError(
00666                 "Bad response: %s (not %s)", res_status, status)
00667 
00668     def _check_errors(self, res):
00669         errors = res.errors
00670         if errors:
00671             raise AppError(
00672                 "Application had errors logged:\n%s", errors)
00673 
00674     def _make_environ(self, extra_environ=None):
00675         environ = self.extra_environ.copy()
00676         environ['paste.throw_errors'] = True
00677         if extra_environ:
00678             environ.update(extra_environ)
00679         return environ
00680 
00681     def _remove_fragment(self, url):
00682         scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
00683         return urlparse.urlunsplit((scheme, netloc, path, query, ""))
00684 
00685     def _gen_request(self, method, url, params=utils.NoDefault,
00686                      headers=None, extra_environ=None, status=None,
00687                      upload_files=None, expect_errors=False,
00688                      content_type=None):
00689         """
00690         Do a generic request.
00691         """
00692 
00693         environ = self._make_environ(extra_environ)
00694 
00695         inline_uploads = []
00696 
00697         # this supports OrderedDict
00698         if isinstance(params, dict) or hasattr(params, 'items'):
00699             params = list(params.items())
00700 
00701         if isinstance(params, (list, tuple)):
00702             inline_uploads = [v for (k, v) in params
00703                               if isinstance(v, (forms.File, forms.Upload))]
00704 
00705         if len(inline_uploads) > 0:
00706             content_type, params = self.encode_multipart(
00707                 params, upload_files or ())
00708             environ['CONTENT_TYPE'] = content_type
00709         else:
00710             params = utils.encode_params(params, content_type)
00711             if upload_files or \
00712                 (content_type and
00713                  to_bytes(content_type).startswith(b'multipart')):
00714                 params = urlparse.parse_qsl(params, keep_blank_values=True)
00715                 content_type, params = self.encode_multipart(
00716                     params, upload_files or ())
00717                 environ['CONTENT_TYPE'] = content_type
00718             elif params:
00719                 environ.setdefault('CONTENT_TYPE',
00720                                    str('application/x-www-form-urlencoded'))
00721 
00722         if content_type is not None:
00723             environ['CONTENT_TYPE'] = content_type
00724         environ['REQUEST_METHOD'] = str(method)
00725         url = str(url)
00726         url = self._remove_fragment(url)
00727         req = self.RequestClass.blank(url, environ)
00728         if isinstance(params, text_type):
00729             params = params.encode(req.charset or 'utf8')
00730         req.environ['wsgi.input'] = BytesIO(params)
00731         req.content_length = len(params)
00732         if headers:
00733             req.headers.update(headers)
00734         return self.do_request(req, status=status,
00735                                expect_errors=expect_errors)
00736 
00737     def _get_file_info(self, file_info):
00738         if len(file_info) == 2:
00739             # It only has a filename
00740             filename = file_info[1]
00741             if self.relative_to:
00742                 filename = os.path.join(self.relative_to, filename)
00743             f = open(filename, 'rb')
00744             content = f.read()
00745             f.close()
00746             return (file_info[0], filename, content, None)
00747         elif 3 <= len(file_info) <= 4:
00748             content = file_info[2]
00749             if not isinstance(content, binary_type):
00750                 raise ValueError('File content must be %s not %s'
00751                                  % (binary_type, type(content)))
00752             if len(file_info) == 3:
00753                 return tuple(file_info) + (None,)
00754             else:
00755                 return file_info
00756         else:
00757             raise ValueError(
00758                 "upload_files need to be a list of tuples of (fieldname, "
00759                 "filename, filecontent, mimetype) or (fieldname, "
00760                 "filename, filecontent) or (fieldname, filename); "
00761                 "you gave: %r"
00762                 % repr(file_info)[:100])
00763 
00764     @staticmethod
00765     def _add_xhr_header(headers):
00766         headers = headers or {}
00767         # if remove str we will be have an error in lint.middleware
00768         headers.update({'X-REQUESTED-WITH': str('XMLHttpRequest')})
00769         return headers


webtest
Author(s): AlexV
autogenerated on Sat Jun 8 2019 20:32:07