simple_httpclient.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 from __future__ import absolute_import, division, with_statement
00003 
00004 from tornado.escape import utf8, _unicode, native_str
00005 from tornado.httpclient import HTTPRequest, HTTPResponse, HTTPError, AsyncHTTPClient, main
00006 from tornado.httputil import HTTPHeaders
00007 from tornado.iostream import IOStream, SSLIOStream
00008 from tornado import stack_context
00009 from tornado.util import b
00010 
00011 import base64
00012 import collections
00013 import contextlib
00014 import copy
00015 import functools
00016 import logging
00017 import os.path
00018 import re
00019 import socket
00020 import sys
00021 import time
00022 import urlparse
00023 import zlib
00024 
00025 try:
00026     from io import BytesIO  # python 3
00027 except ImportError:
00028     from cStringIO import StringIO as BytesIO  # python 2
00029 
00030 try:
00031     import ssl  # python 2.6+
00032 except ImportError:
00033     ssl = None
00034 
00035 _DEFAULT_CA_CERTS = os.path.dirname(__file__) + '/ca-certificates.crt'
00036 
00037 
00038 class SimpleAsyncHTTPClient(AsyncHTTPClient):
00039     """Non-blocking HTTP client with no external dependencies.
00040 
00041     This class implements an HTTP 1.1 client on top of Tornado's IOStreams.
00042     It does not currently implement all applicable parts of the HTTP
00043     specification, but it does enough to work with major web service APIs
00044     (mostly tested against the Twitter API so far).
00045 
00046     This class has not been tested extensively in production and
00047     should be considered somewhat experimental as of the release of
00048     tornado 1.2.  It is intended to become the default AsyncHTTPClient
00049     implementation in a future release.  It may either be used
00050     directly, or to facilitate testing of this class with an existing
00051     application, setting the environment variable
00052     USE_SIMPLE_HTTPCLIENT=1 will cause this class to transparently
00053     replace tornado.httpclient.AsyncHTTPClient.
00054 
00055     Some features found in the curl-based AsyncHTTPClient are not yet
00056     supported.  In particular, proxies are not supported, connections
00057     are not reused, and callers cannot select the network interface to be
00058     used.
00059 
00060     Python 2.6 or higher is required for HTTPS support.  Users of Python 2.5
00061     should use the curl-based AsyncHTTPClient if HTTPS support is required.
00062 
00063     """
00064     def initialize(self, io_loop=None, max_clients=10,
00065                    max_simultaneous_connections=None,
00066                    hostname_mapping=None, max_buffer_size=104857600):
00067         """Creates a AsyncHTTPClient.
00068 
00069         Only a single AsyncHTTPClient instance exists per IOLoop
00070         in order to provide limitations on the number of pending connections.
00071         force_instance=True may be used to suppress this behavior.
00072 
00073         max_clients is the number of concurrent requests that can be in
00074         progress.  max_simultaneous_connections has no effect and is accepted
00075         only for compatibility with the curl-based AsyncHTTPClient.  Note
00076         that these arguments are only used when the client is first created,
00077         and will be ignored when an existing client is reused.
00078 
00079         hostname_mapping is a dictionary mapping hostnames to IP addresses.
00080         It can be used to make local DNS changes when modifying system-wide
00081         settings like /etc/hosts is not possible or desirable (e.g. in
00082         unittests).
00083 
00084         max_buffer_size is the number of bytes that can be read by IOStream. It
00085         defaults to 100mb.
00086         """
00087         self.io_loop = io_loop
00088         self.max_clients = max_clients
00089         self.queue = collections.deque()
00090         self.active = {}
00091         self.hostname_mapping = hostname_mapping
00092         self.max_buffer_size = max_buffer_size
00093 
00094     def fetch(self, request, callback, **kwargs):
00095         if not isinstance(request, HTTPRequest):
00096             request = HTTPRequest(url=request, **kwargs)
00097         # We're going to modify this (to add Host, Accept-Encoding, etc),
00098         # so make sure we don't modify the caller's object.  This is also
00099         # where normal dicts get converted to HTTPHeaders objects.
00100         request.headers = HTTPHeaders(request.headers)
00101         callback = stack_context.wrap(callback)
00102         self.queue.append((request, callback))
00103         self._process_queue()
00104         if self.queue:
00105             logging.debug("max_clients limit reached, request queued. "
00106                           "%d active, %d queued requests." % (
00107                     len(self.active), len(self.queue)))
00108 
00109     def _process_queue(self):
00110         with stack_context.NullContext():
00111             while self.queue and len(self.active) < self.max_clients:
00112                 request, callback = self.queue.popleft()
00113                 key = object()
00114                 self.active[key] = (request, callback)
00115                 _HTTPConnection(self.io_loop, self, request,
00116                                 functools.partial(self._release_fetch, key),
00117                                 callback,
00118                                 self.max_buffer_size)
00119 
00120     def _release_fetch(self, key):
00121         del self.active[key]
00122         self._process_queue()
00123 
00124 
00125 class _HTTPConnection(object):
00126     _SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
00127 
00128     def __init__(self, io_loop, client, request, release_callback,
00129                  final_callback, max_buffer_size):
00130         self.start_time = time.time()
00131         self.io_loop = io_loop
00132         self.client = client
00133         self.request = request
00134         self.release_callback = release_callback
00135         self.final_callback = final_callback
00136         self.code = None
00137         self.headers = None
00138         self.chunks = None
00139         self._decompressor = None
00140         # Timeout handle returned by IOLoop.add_timeout
00141         self._timeout = None
00142         with stack_context.StackContext(self.cleanup):
00143             parsed = urlparse.urlsplit(_unicode(self.request.url))
00144             if ssl is None and parsed.scheme == "https":
00145                 raise ValueError("HTTPS requires either python2.6+ or "
00146                                  "curl_httpclient")
00147             if parsed.scheme not in ("http", "https"):
00148                 raise ValueError("Unsupported url scheme: %s" %
00149                                  self.request.url)
00150             # urlsplit results have hostname and port results, but they
00151             # didn't support ipv6 literals until python 2.7.
00152             netloc = parsed.netloc
00153             if "@" in netloc:
00154                 userpass, _, netloc = netloc.rpartition("@")
00155             match = re.match(r'^(.+):(\d+)$', netloc)
00156             if match:
00157                 host = match.group(1)
00158                 port = int(match.group(2))
00159             else:
00160                 host = netloc
00161                 port = 443 if parsed.scheme == "https" else 80
00162             if re.match(r'^\[.*\]$', host):
00163                 # raw ipv6 addresses in urls are enclosed in brackets
00164                 host = host[1:-1]
00165             parsed_hostname = host  # save final parsed host for _on_connect
00166             if self.client.hostname_mapping is not None:
00167                 host = self.client.hostname_mapping.get(host, host)
00168 
00169             if request.allow_ipv6:
00170                 af = socket.AF_UNSPEC
00171             else:
00172                 # We only try the first IP we get from getaddrinfo,
00173                 # so restrict to ipv4 by default.
00174                 af = socket.AF_INET
00175 
00176             addrinfo = socket.getaddrinfo(host, port, af, socket.SOCK_STREAM,
00177                                           0, 0)
00178             af, socktype, proto, canonname, sockaddr = addrinfo[0]
00179 
00180             if parsed.scheme == "https":
00181                 ssl_options = {}
00182                 if request.validate_cert:
00183                     ssl_options["cert_reqs"] = ssl.CERT_REQUIRED
00184                 if request.ca_certs is not None:
00185                     ssl_options["ca_certs"] = request.ca_certs
00186                 else:
00187                     ssl_options["ca_certs"] = _DEFAULT_CA_CERTS
00188                 if request.client_key is not None:
00189                     ssl_options["keyfile"] = request.client_key
00190                 if request.client_cert is not None:
00191                     ssl_options["certfile"] = request.client_cert
00192 
00193                 # SSL interoperability is tricky.  We want to disable
00194                 # SSLv2 for security reasons; it wasn't disabled by default
00195                 # until openssl 1.0.  The best way to do this is to use
00196                 # the SSL_OP_NO_SSLv2, but that wasn't exposed to python
00197                 # until 3.2.  Python 2.7 adds the ciphers argument, which
00198                 # can also be used to disable SSLv2.  As a last resort
00199                 # on python 2.6, we set ssl_version to SSLv3.  This is
00200                 # more narrow than we'd like since it also breaks
00201                 # compatibility with servers configured for TLSv1 only,
00202                 # but nearly all servers support SSLv3:
00203                 # http://blog.ivanristic.com/2011/09/ssl-survey-protocol-support.html
00204                 if sys.version_info >= (2, 7):
00205                     ssl_options["ciphers"] = "DEFAULT:!SSLv2"
00206                 else:
00207                     # This is really only necessary for pre-1.0 versions
00208                     # of openssl, but python 2.6 doesn't expose version
00209                     # information.
00210                     ssl_options["ssl_version"] = ssl.PROTOCOL_SSLv3
00211 
00212                 self.stream = SSLIOStream(socket.socket(af, socktype, proto),
00213                                           io_loop=self.io_loop,
00214                                           ssl_options=ssl_options,
00215                                           max_buffer_size=max_buffer_size)
00216             else:
00217                 self.stream = IOStream(socket.socket(af, socktype, proto),
00218                                        io_loop=self.io_loop,
00219                                        max_buffer_size=max_buffer_size)
00220             timeout = min(request.connect_timeout, request.request_timeout)
00221             if timeout:
00222                 self._timeout = self.io_loop.add_timeout(
00223                     self.start_time + timeout,
00224                     self._on_timeout)
00225             self.stream.set_close_callback(self._on_close)
00226             self.stream.connect(sockaddr,
00227                                 functools.partial(self._on_connect, parsed,
00228                                                   parsed_hostname))
00229 
00230     def _on_timeout(self):
00231         self._timeout = None
00232         self._run_callback(HTTPResponse(self.request, 599,
00233                                         request_time=time.time() - self.start_time,
00234                                         error=HTTPError(599, "Timeout")))
00235         self.stream.close()
00236 
00237     def _on_connect(self, parsed, parsed_hostname):
00238         if self._timeout is not None:
00239             self.io_loop.remove_timeout(self._timeout)
00240             self._timeout = None
00241         if self.request.request_timeout:
00242             self._timeout = self.io_loop.add_timeout(
00243                 self.start_time + self.request.request_timeout,
00244                 self._on_timeout)
00245         if (self.request.validate_cert and
00246             isinstance(self.stream, SSLIOStream)):
00247             match_hostname(self.stream.socket.getpeercert(),
00248                            # ipv6 addresses are broken (in
00249                            # parsed.hostname) until 2.7, here is
00250                            # correctly parsed value calculated in
00251                            # __init__
00252                            parsed_hostname)
00253         if (self.request.method not in self._SUPPORTED_METHODS and
00254             not self.request.allow_nonstandard_methods):
00255             raise KeyError("unknown method %s" % self.request.method)
00256         for key in ('network_interface',
00257                     'proxy_host', 'proxy_port',
00258                     'proxy_username', 'proxy_password'):
00259             if getattr(self.request, key, None):
00260                 raise NotImplementedError('%s not supported' % key)
00261         if "Connection" not in self.request.headers:
00262             self.request.headers["Connection"] = "close"
00263         if "Host" not in self.request.headers:
00264             if '@' in parsed.netloc:
00265                 self.request.headers["Host"] = parsed.netloc.rpartition('@')[-1]
00266             else:
00267                 self.request.headers["Host"] = parsed.netloc
00268         username, password = None, None
00269         if parsed.username is not None:
00270             username, password = parsed.username, parsed.password
00271         elif self.request.auth_username is not None:
00272             username = self.request.auth_username
00273             password = self.request.auth_password or ''
00274         if username is not None:
00275             auth = utf8(username) + b(":") + utf8(password)
00276             self.request.headers["Authorization"] = (b("Basic ") +
00277                                                      base64.b64encode(auth))
00278         if self.request.user_agent:
00279             self.request.headers["User-Agent"] = self.request.user_agent
00280         if not self.request.allow_nonstandard_methods:
00281             if self.request.method in ("POST", "PATCH", "PUT"):
00282                 assert self.request.body is not None
00283             else:
00284                 assert self.request.body is None
00285         if self.request.body is not None:
00286             self.request.headers["Content-Length"] = str(len(
00287                     self.request.body))
00288         if (self.request.method == "POST" and
00289             "Content-Type" not in self.request.headers):
00290             self.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
00291         if self.request.use_gzip:
00292             self.request.headers["Accept-Encoding"] = "gzip"
00293         req_path = ((parsed.path or '/') +
00294                 (('?' + parsed.query) if parsed.query else ''))
00295         request_lines = [utf8("%s %s HTTP/1.1" % (self.request.method,
00296                                                   req_path))]
00297         for k, v in self.request.headers.get_all():
00298             line = utf8(k) + b(": ") + utf8(v)
00299             if b('\n') in line:
00300                 raise ValueError('Newline in header: ' + repr(line))
00301             request_lines.append(line)
00302         self.stream.write(b("\r\n").join(request_lines) + b("\r\n\r\n"))
00303         if self.request.body is not None:
00304             self.stream.write(self.request.body)
00305         self.stream.read_until_regex(b("\r?\n\r?\n"), self._on_headers)
00306 
00307     def _release(self):
00308         if self.release_callback is not None:
00309             release_callback = self.release_callback
00310             self.release_callback = None
00311             release_callback()
00312 
00313     def _run_callback(self, response):
00314         self._release()
00315         if self.final_callback is not None:
00316             final_callback = self.final_callback
00317             self.final_callback = None
00318             final_callback(response)
00319 
00320     @contextlib.contextmanager
00321     def cleanup(self):
00322         try:
00323             yield
00324         except Exception, e:
00325             logging.warning("uncaught exception", exc_info=True)
00326             self._run_callback(HTTPResponse(self.request, 599, error=e,
00327                                 request_time=time.time() - self.start_time,
00328                                 ))
00329             if hasattr(self, "stream"):
00330                 self.stream.close()
00331 
00332     def _on_close(self):
00333         self._run_callback(HTTPResponse(
00334                 self.request, 599,
00335                 request_time=time.time() - self.start_time,
00336                 error=HTTPError(599, "Connection closed")))
00337 
00338     def _on_headers(self, data):
00339         data = native_str(data.decode("latin1"))
00340         first_line, _, header_data = data.partition("\n")
00341         match = re.match("HTTP/1.[01] ([0-9]+)", first_line)
00342         assert match
00343         self.code = int(match.group(1))
00344         self.headers = HTTPHeaders.parse(header_data)
00345 
00346         if "Content-Length" in self.headers:
00347             if "," in self.headers["Content-Length"]:
00348                 # Proxies sometimes cause Content-Length headers to get
00349                 # duplicated.  If all the values are identical then we can
00350                 # use them but if they differ it's an error.
00351                 pieces = re.split(r',\s*', self.headers["Content-Length"])
00352                 if any(i != pieces[0] for i in pieces):
00353                     raise ValueError("Multiple unequal Content-Lengths: %r" %
00354                                      self.headers["Content-Length"])
00355                 self.headers["Content-Length"] = pieces[0]
00356             content_length = int(self.headers["Content-Length"])
00357         else:
00358             content_length = None
00359 
00360         if self.request.header_callback is not None:
00361             for k, v in self.headers.get_all():
00362                 self.request.header_callback("%s: %s\r\n" % (k, v))
00363 
00364         if self.request.method == "HEAD":
00365             # HEAD requests never have content, even though they may have
00366             # content-length headers
00367             self._on_body(b(""))
00368             return
00369         if 100 <= self.code < 200 or self.code in (204, 304):
00370             # These response codes never have bodies
00371             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3
00372             assert "Transfer-Encoding" not in self.headers
00373             assert content_length in (None, 0)
00374             self._on_body(b(""))
00375             return
00376 
00377         if (self.request.use_gzip and
00378             self.headers.get("Content-Encoding") == "gzip"):
00379             # Magic parameter makes zlib module understand gzip header
00380             # http://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib
00381             self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
00382         if self.headers.get("Transfer-Encoding") == "chunked":
00383             self.chunks = []
00384             self.stream.read_until(b("\r\n"), self._on_chunk_length)
00385         elif content_length is not None:
00386             self.stream.read_bytes(content_length, self._on_body)
00387         else:
00388             self.stream.read_until_close(self._on_body)
00389 
00390     def _on_body(self, data):
00391         if self._timeout is not None:
00392             self.io_loop.remove_timeout(self._timeout)
00393             self._timeout = None
00394         original_request = getattr(self.request, "original_request",
00395                                    self.request)
00396         if (self.request.follow_redirects and
00397             self.request.max_redirects > 0 and
00398             self.code in (301, 302, 303, 307)):
00399             new_request = copy.copy(self.request)
00400             new_request.url = urlparse.urljoin(self.request.url,
00401                                                self.headers["Location"])
00402             new_request.max_redirects -= 1
00403             del new_request.headers["Host"]
00404             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.3.4
00405             # client SHOULD make a GET request
00406             if self.code == 303:
00407                 new_request.method = "GET"
00408                 new_request.body = None
00409                 for h in ["Content-Length", "Content-Type",
00410                           "Content-Encoding", "Transfer-Encoding"]:
00411                     try:
00412                         del self.request.headers[h]
00413                     except KeyError:
00414                         pass
00415             new_request.original_request = original_request
00416             final_callback = self.final_callback
00417             self.final_callback = None
00418             self._release()
00419             self.client.fetch(new_request, final_callback)
00420             self.stream.close()
00421             return
00422         if self._decompressor:
00423             data = self._decompressor.decompress(data)
00424         if self.request.streaming_callback:
00425             if self.chunks is None:
00426                 # if chunks is not None, we already called streaming_callback
00427                 # in _on_chunk_data
00428                 self.request.streaming_callback(data)
00429             buffer = BytesIO()
00430         else:
00431             buffer = BytesIO(data)  # TODO: don't require one big string?
00432         response = HTTPResponse(original_request,
00433                                 self.code, headers=self.headers,
00434                                 request_time=time.time() - self.start_time,
00435                                 buffer=buffer,
00436                                 effective_url=self.request.url)
00437         self._run_callback(response)
00438         self.stream.close()
00439 
00440     def _on_chunk_length(self, data):
00441         # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
00442         length = int(data.strip(), 16)
00443         if length == 0:
00444             # all the data has been decompressed, so we don't need to
00445             # decompress again in _on_body
00446             self._decompressor = None
00447             self._on_body(b('').join(self.chunks))
00448         else:
00449             self.stream.read_bytes(length + 2,  # chunk ends with \r\n
00450                               self._on_chunk_data)
00451 
00452     def _on_chunk_data(self, data):
00453         assert data[-2:] == b("\r\n")
00454         chunk = data[:-2]
00455         if self._decompressor:
00456             chunk = self._decompressor.decompress(chunk)
00457         if self.request.streaming_callback is not None:
00458             self.request.streaming_callback(chunk)
00459         else:
00460             self.chunks.append(chunk)
00461         self.stream.read_until(b("\r\n"), self._on_chunk_length)
00462 
00463 
00464 # match_hostname was added to the standard library ssl module in python 3.2.
00465 # The following code was backported for older releases and copied from
00466 # https://bitbucket.org/brandon/backports.ssl_match_hostname
00467 class CertificateError(ValueError):
00468     pass
00469 
00470 
00471 def _dnsname_to_pat(dn):
00472     pats = []
00473     for frag in dn.split(r'.'):
00474         if frag == '*':
00475             # When '*' is a fragment by itself, it matches a non-empty dotless
00476             # fragment.
00477             pats.append('[^.]+')
00478         else:
00479             # Otherwise, '*' matches any dotless fragment.
00480             frag = re.escape(frag)
00481             pats.append(frag.replace(r'\*', '[^.]*'))
00482     return re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE)
00483 
00484 
00485 def match_hostname(cert, hostname):
00486     """Verify that *cert* (in decoded format as returned by
00487     SSLSocket.getpeercert()) matches the *hostname*.  RFC 2818 rules
00488     are mostly followed, but IP addresses are not accepted for *hostname*.
00489 
00490     CertificateError is raised on failure. On success, the function
00491     returns nothing.
00492     """
00493     if not cert:
00494         raise ValueError("empty or no certificate")
00495     dnsnames = []
00496     san = cert.get('subjectAltName', ())
00497     for key, value in san:
00498         if key == 'DNS':
00499             if _dnsname_to_pat(value).match(hostname):
00500                 return
00501             dnsnames.append(value)
00502     if not san:
00503         # The subject is only checked when subjectAltName is empty
00504         for sub in cert.get('subject', ()):
00505             for key, value in sub:
00506                 # XXX according to RFC 2818, the most specific Common Name
00507                 # must be used.
00508                 if key == 'commonName':
00509                     if _dnsname_to_pat(value).match(hostname):
00510                         return
00511                     dnsnames.append(value)
00512     if len(dnsnames) > 1:
00513         raise CertificateError("hostname %r "
00514             "doesn't match either of %s"
00515             % (hostname, ', '.join(map(repr, dnsnames))))
00516     elif len(dnsnames) == 1:
00517         raise CertificateError("hostname %r "
00518             "doesn't match %r"
00519             % (hostname, dnsnames[0]))
00520     else:
00521         raise CertificateError("no appropriate commonName or "
00522             "subjectAltName fields were found")
00523 
00524 if __name__ == "__main__":
00525     AsyncHTTPClient.configure(SimpleAsyncHTTPClient)
00526     main()


roswww
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:53:30