simple_httpclient.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 from __future__ import absolute_import, division, print_function, with_statement
00003 
00004 from tornado.concurrent import is_future
00005 from tornado.escape import utf8, _unicode
00006 from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main, _RequestProxy
00007 from tornado import httputil
00008 from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters
00009 from tornado.iostream import StreamClosedError
00010 from tornado.netutil import Resolver, OverrideResolver
00011 from tornado.log import gen_log
00012 from tornado import stack_context
00013 from tornado.tcpclient import TCPClient
00014 
00015 import base64
00016 import collections
00017 import copy
00018 import functools
00019 import re
00020 import socket
00021 import sys
00022 
00023 try:
00024     from io import BytesIO  # python 3
00025 except ImportError:
00026     from cStringIO import StringIO as BytesIO  # python 2
00027 
00028 try:
00029     import urlparse  # py2
00030 except ImportError:
00031     import urllib.parse as urlparse  # py3
00032 
00033 try:
00034     import ssl
00035 except ImportError:
00036     # ssl is not available on Google App Engine.
00037     ssl = None
00038 
00039 try:
00040     import certifi
00041 except ImportError:
00042     certifi = None
00043 
00044 
00045 def _default_ca_certs():
00046     if certifi is None:
00047         raise Exception("The 'certifi' package is required to use https "
00048                         "in simple_httpclient")
00049     return certifi.where()
00050 
00051 
00052 class SimpleAsyncHTTPClient(AsyncHTTPClient):
00053     """Non-blocking HTTP client with no external dependencies.
00054 
00055     This class implements an HTTP 1.1 client on top of Tornado's IOStreams.
00056     It does not currently implement all applicable parts of the HTTP
00057     specification, but it does enough to work with major web service APIs.
00058 
00059     Some features found in the curl-based AsyncHTTPClient are not yet
00060     supported.  In particular, proxies are not supported, connections
00061     are not reused, and callers cannot select the network interface to be
00062     used.
00063     """
00064     def initialize(self, io_loop, max_clients=10,
00065                    hostname_mapping=None, max_buffer_size=104857600,
00066                    resolver=None, defaults=None, max_header_size=None):
00067         """Creates a AsyncHTTPClient.
00068 
00069         Only a single AsyncHTTPClient instance exists per IOLoop
00070         in order to provide limitations on the number of pending connections.
00071         force_instance=True may be used to suppress this behavior.
00072 
00073         max_clients is the number of concurrent requests that can be
00074         in progress.  Note that this arguments are only used when the
00075         client is first created, and will be ignored when an existing
00076         client is reused.
00077 
00078         hostname_mapping is a dictionary mapping hostnames to IP addresses.
00079         It can be used to make local DNS changes when modifying system-wide
00080         settings like /etc/hosts is not possible or desirable (e.g. in
00081         unittests).
00082 
00083         max_buffer_size is the number of bytes that can be read by IOStream. It
00084         defaults to 100mb.
00085         """
00086         super(SimpleAsyncHTTPClient, self).initialize(io_loop,
00087                                                       defaults=defaults)
00088         self.max_clients = max_clients
00089         self.queue = collections.deque()
00090         self.active = {}
00091         self.waiting = {}
00092         self.max_buffer_size = max_buffer_size
00093         self.max_header_size = max_header_size
00094         # TCPClient could create a Resolver for us, but we have to do it
00095         # ourselves to support hostname_mapping.
00096         if resolver:
00097             self.resolver = resolver
00098             self.own_resolver = False
00099         else:
00100             self.resolver = Resolver(io_loop=io_loop)
00101             self.own_resolver = True
00102         if hostname_mapping is not None:
00103             self.resolver = OverrideResolver(resolver=self.resolver,
00104                                              mapping=hostname_mapping)
00105         self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)
00106 
00107     def close(self):
00108         super(SimpleAsyncHTTPClient, self).close()
00109         if self.own_resolver:
00110             self.resolver.close()
00111         self.tcp_client.close()
00112 
00113     def fetch_impl(self, request, callback):
00114         key = object()
00115         self.queue.append((key, request, callback))
00116         if not len(self.active) < self.max_clients:
00117             timeout_handle = self.io_loop.add_timeout(
00118                 self.io_loop.time() + min(request.connect_timeout,
00119                                           request.request_timeout),
00120                 functools.partial(self._on_timeout, key))
00121         else:
00122             timeout_handle = None
00123         self.waiting[key] = (request, callback, timeout_handle)
00124         self._process_queue()
00125         if self.queue:
00126             gen_log.debug("max_clients limit reached, request queued. "
00127                           "%d active, %d queued requests." % (
00128                               len(self.active), len(self.queue)))
00129 
00130     def _process_queue(self):
00131         with stack_context.NullContext():
00132             while self.queue and len(self.active) < self.max_clients:
00133                 key, request, callback = self.queue.popleft()
00134                 if key not in self.waiting:
00135                     continue
00136                 self._remove_timeout(key)
00137                 self.active[key] = (request, callback)
00138                 release_callback = functools.partial(self._release_fetch, key)
00139                 self._handle_request(request, release_callback, callback)
00140 
00141     def _handle_request(self, request, release_callback, final_callback):
00142         _HTTPConnection(self.io_loop, self, request, release_callback,
00143                         final_callback, self.max_buffer_size, self.tcp_client,
00144                         self.max_header_size)
00145 
00146     def _release_fetch(self, key):
00147         del self.active[key]
00148         self._process_queue()
00149 
00150     def _remove_timeout(self, key):
00151         if key in self.waiting:
00152             request, callback, timeout_handle = self.waiting[key]
00153             if timeout_handle is not None:
00154                 self.io_loop.remove_timeout(timeout_handle)
00155             del self.waiting[key]
00156 
00157     def _on_timeout(self, key):
00158         request, callback, timeout_handle = self.waiting[key]
00159         self.queue.remove((key, request, callback))
00160         timeout_response = HTTPResponse(
00161             request, 599, error=HTTPError(599, "Timeout"),
00162             request_time=self.io_loop.time() - request.start_time)
00163         self.io_loop.add_callback(callback, timeout_response)
00164         del self.waiting[key]
00165 
00166 
00167 class _HTTPConnection(httputil.HTTPMessageDelegate):
00168     _SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
00169 
00170     def __init__(self, io_loop, client, request, release_callback,
00171                  final_callback, max_buffer_size, tcp_client,
00172                  max_header_size):
00173         self.start_time = io_loop.time()
00174         self.io_loop = io_loop
00175         self.client = client
00176         self.request = request
00177         self.release_callback = release_callback
00178         self.final_callback = final_callback
00179         self.max_buffer_size = max_buffer_size
00180         self.tcp_client = tcp_client
00181         self.max_header_size = max_header_size
00182         self.code = None
00183         self.headers = None
00184         self.chunks = []
00185         self._decompressor = None
00186         # Timeout handle returned by IOLoop.add_timeout
00187         self._timeout = None
00188         self._sockaddr = None
00189         with stack_context.ExceptionStackContext(self._handle_exception):
00190             self.parsed = urlparse.urlsplit(_unicode(self.request.url))
00191             if self.parsed.scheme not in ("http", "https"):
00192                 raise ValueError("Unsupported url scheme: %s" %
00193                                  self.request.url)
00194             # urlsplit results have hostname and port results, but they
00195             # didn't support ipv6 literals until python 2.7.
00196             netloc = self.parsed.netloc
00197             if "@" in netloc:
00198                 userpass, _, netloc = netloc.rpartition("@")
00199             match = re.match(r'^(.+):(\d+)$', netloc)
00200             if match:
00201                 host = match.group(1)
00202                 port = int(match.group(2))
00203             else:
00204                 host = netloc
00205                 port = 443 if self.parsed.scheme == "https" else 80
00206             if re.match(r'^\[.*\]$', host):
00207                 # raw ipv6 addresses in urls are enclosed in brackets
00208                 host = host[1:-1]
00209             self.parsed_hostname = host  # save final host for _on_connect
00210 
00211             if request.allow_ipv6 is False:
00212                 af = socket.AF_INET
00213             else:
00214                 af = socket.AF_UNSPEC
00215 
00216             ssl_options = self._get_ssl_options(self.parsed.scheme)
00217 
00218             timeout = min(self.request.connect_timeout, self.request.request_timeout)
00219             if timeout:
00220                 self._timeout = self.io_loop.add_timeout(
00221                     self.start_time + timeout,
00222                     stack_context.wrap(self._on_timeout))
00223             self.tcp_client.connect(host, port, af=af,
00224                                     ssl_options=ssl_options,
00225                                     max_buffer_size=self.max_buffer_size,
00226                                     callback=self._on_connect)
00227 
00228     def _get_ssl_options(self, scheme):
00229         if scheme == "https":
00230             ssl_options = {}
00231             if self.request.validate_cert:
00232                 ssl_options["cert_reqs"] = ssl.CERT_REQUIRED
00233             if self.request.ca_certs is not None:
00234                 ssl_options["ca_certs"] = self.request.ca_certs
00235             else:
00236                 ssl_options["ca_certs"] = _default_ca_certs()
00237             if self.request.client_key is not None:
00238                 ssl_options["keyfile"] = self.request.client_key
00239             if self.request.client_cert is not None:
00240                 ssl_options["certfile"] = self.request.client_cert
00241 
00242             # SSL interoperability is tricky.  We want to disable
00243             # SSLv2 for security reasons; it wasn't disabled by default
00244             # until openssl 1.0.  The best way to do this is to use
00245             # the SSL_OP_NO_SSLv2, but that wasn't exposed to python
00246             # until 3.2.  Python 2.7 adds the ciphers argument, which
00247             # can also be used to disable SSLv2.  As a last resort
00248             # on python 2.6, we set ssl_version to TLSv1.  This is
00249             # more narrow than we'd like since it also breaks
00250             # compatibility with servers configured for SSLv3 only,
00251             # but nearly all servers support both SSLv3 and TLSv1:
00252             # http://blog.ivanristic.com/2011/09/ssl-survey-protocol-support.html
00253             if sys.version_info >= (2, 7):
00254                 # In addition to disabling SSLv2, we also exclude certain
00255                 # classes of insecure ciphers.
00256                 ssl_options["ciphers"] = "DEFAULT:!SSLv2:!EXPORT:!DES"
00257             else:
00258                 # This is really only necessary for pre-1.0 versions
00259                 # of openssl, but python 2.6 doesn't expose version
00260                 # information.
00261                 ssl_options["ssl_version"] = ssl.PROTOCOL_TLSv1
00262             return ssl_options
00263         return None
00264 
00265     def _on_timeout(self):
00266         self._timeout = None
00267         if self.final_callback is not None:
00268             raise HTTPError(599, "Timeout")
00269 
00270     def _remove_timeout(self):
00271         if self._timeout is not None:
00272             self.io_loop.remove_timeout(self._timeout)
00273             self._timeout = None
00274 
00275     def _on_connect(self, stream):
00276         if self.final_callback is None:
00277             # final_callback is cleared if we've hit our timeout.
00278             stream.close()
00279             return
00280         self.stream = stream
00281         self.stream.set_close_callback(self.on_connection_close)
00282         self._remove_timeout()
00283         if self.final_callback is None:
00284             return
00285         if self.request.request_timeout:
00286             self._timeout = self.io_loop.add_timeout(
00287                 self.start_time + self.request.request_timeout,
00288                 stack_context.wrap(self._on_timeout))
00289         if (self.request.method not in self._SUPPORTED_METHODS and
00290                 not self.request.allow_nonstandard_methods):
00291             raise KeyError("unknown method %s" % self.request.method)
00292         for key in ('network_interface',
00293                     'proxy_host', 'proxy_port',
00294                     'proxy_username', 'proxy_password'):
00295             if getattr(self.request, key, None):
00296                 raise NotImplementedError('%s not supported' % key)
00297         if "Connection" not in self.request.headers:
00298             self.request.headers["Connection"] = "close"
00299         if "Host" not in self.request.headers:
00300             if '@' in self.parsed.netloc:
00301                 self.request.headers["Host"] = self.parsed.netloc.rpartition('@')[-1]
00302             else:
00303                 self.request.headers["Host"] = self.parsed.netloc
00304         username, password = None, None
00305         if self.parsed.username is not None:
00306             username, password = self.parsed.username, self.parsed.password
00307         elif self.request.auth_username is not None:
00308             username = self.request.auth_username
00309             password = self.request.auth_password or ''
00310         if username is not None:
00311             if self.request.auth_mode not in (None, "basic"):
00312                 raise ValueError("unsupported auth_mode %s",
00313                                  self.request.auth_mode)
00314             auth = utf8(username) + b":" + utf8(password)
00315             self.request.headers["Authorization"] = (b"Basic " +
00316                                                      base64.b64encode(auth))
00317         if self.request.user_agent:
00318             self.request.headers["User-Agent"] = self.request.user_agent
00319         if not self.request.allow_nonstandard_methods:
00320             if self.request.method in ("POST", "PATCH", "PUT"):
00321                 if (self.request.body is None and
00322                         self.request.body_producer is None):
00323                     raise AssertionError(
00324                         'Body must not be empty for "%s" request'
00325                         % self.request.method)
00326             else:
00327                 if (self.request.body is not None or
00328                         self.request.body_producer is not None):
00329                     raise AssertionError(
00330                         'Body must be empty for "%s" request'
00331                         % self.request.method)
00332         if self.request.expect_100_continue:
00333             self.request.headers["Expect"] = "100-continue"
00334         if self.request.body is not None:
00335             # When body_producer is used the caller is responsible for
00336             # setting Content-Length (or else chunked encoding will be used).
00337             self.request.headers["Content-Length"] = str(len(
00338                 self.request.body))
00339         if (self.request.method == "POST" and
00340                 "Content-Type" not in self.request.headers):
00341             self.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
00342         if self.request.decompress_response:
00343             self.request.headers["Accept-Encoding"] = "gzip"
00344         req_path = ((self.parsed.path or '/') +
00345                     (('?' + self.parsed.query) if self.parsed.query else ''))
00346         self.stream.set_nodelay(True)
00347         self.connection = HTTP1Connection(
00348             self.stream, True,
00349             HTTP1ConnectionParameters(
00350                 no_keep_alive=True,
00351                 max_header_size=self.max_header_size,
00352                 decompress=self.request.decompress_response),
00353             self._sockaddr)
00354         start_line = httputil.RequestStartLine(self.request.method,
00355                                                req_path, 'HTTP/1.1')
00356         self.connection.write_headers(start_line, self.request.headers)
00357         if self.request.expect_100_continue:
00358             self._read_response()
00359         else:
00360             self._write_body(True)
00361 
00362     def _write_body(self, start_read):
00363         if self.request.body is not None:
00364             self.connection.write(self.request.body)
00365             self.connection.finish()
00366         elif self.request.body_producer is not None:
00367             fut = self.request.body_producer(self.connection.write)
00368             if is_future(fut):
00369                 def on_body_written(fut):
00370                     fut.result()
00371                     self.connection.finish()
00372                     if start_read:
00373                         self._read_response()
00374                 self.io_loop.add_future(fut, on_body_written)
00375                 return
00376             self.connection.finish()
00377         if start_read:
00378             self._read_response()
00379 
00380     def _read_response(self):
00381         # Ensure that any exception raised in read_response ends up in our
00382         # stack context.
00383         self.io_loop.add_future(
00384             self.connection.read_response(self),
00385             lambda f: f.result())
00386 
00387     def _release(self):
00388         if self.release_callback is not None:
00389             release_callback = self.release_callback
00390             self.release_callback = None
00391             release_callback()
00392 
00393     def _run_callback(self, response):
00394         self._release()
00395         if self.final_callback is not None:
00396             final_callback = self.final_callback
00397             self.final_callback = None
00398             self.io_loop.add_callback(final_callback, response)
00399 
00400     def _handle_exception(self, typ, value, tb):
00401         if self.final_callback:
00402             self._remove_timeout()
00403             if isinstance(value, StreamClosedError):
00404                 value = HTTPError(599, "Stream closed")
00405             self._run_callback(HTTPResponse(self.request, 599, error=value,
00406                                             request_time=self.io_loop.time() - self.start_time,
00407                                             ))
00408 
00409             if hasattr(self, "stream"):
00410                 # TODO: this may cause a StreamClosedError to be raised
00411                 # by the connection's Future.  Should we cancel the
00412                 # connection more gracefully?
00413                 self.stream.close()
00414             return True
00415         else:
00416             # If our callback has already been called, we are probably
00417             # catching an exception that is not caused by us but rather
00418             # some child of our callback. Rather than drop it on the floor,
00419             # pass it along, unless it's just the stream being closed.
00420             return isinstance(value, StreamClosedError)
00421 
00422     def on_connection_close(self):
00423         if self.final_callback is not None:
00424             message = "Connection closed"
00425             if self.stream.error:
00426                 raise self.stream.error
00427             try:
00428                 raise HTTPError(599, message)
00429             except HTTPError:
00430                 self._handle_exception(*sys.exc_info())
00431 
00432     def headers_received(self, first_line, headers):
00433         if self.request.expect_100_continue and first_line.code == 100:
00434             self._write_body(False)
00435             return
00436         self.headers = headers
00437         self.code = first_line.code
00438         self.reason = first_line.reason
00439 
00440         if self.request.header_callback is not None:
00441             # Reassemble the start line.
00442             self.request.header_callback('%s %s %s\r\n' % first_line)
00443             for k, v in self.headers.get_all():
00444                 self.request.header_callback("%s: %s\r\n" % (k, v))
00445             self.request.header_callback('\r\n')
00446 
00447     def finish(self):
00448         data = b''.join(self.chunks)
00449         self._remove_timeout()
00450         original_request = getattr(self.request, "original_request",
00451                                    self.request)
00452         if (self.request.follow_redirects and
00453             self.request.max_redirects > 0 and
00454                 self.code in (301, 302, 303, 307)):
00455             assert isinstance(self.request, _RequestProxy)
00456             new_request = copy.copy(self.request.request)
00457             new_request.url = urlparse.urljoin(self.request.url,
00458                                                self.headers["Location"])
00459             new_request.max_redirects = self.request.max_redirects - 1
00460             del new_request.headers["Host"]
00461             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.3.4
00462             # Client SHOULD make a GET request after a 303.
00463             # According to the spec, 302 should be followed by the same
00464             # method as the original request, but in practice browsers
00465             # treat 302 the same as 303, and many servers use 302 for
00466             # compatibility with pre-HTTP/1.1 user agents which don't
00467             # understand the 303 status.
00468             if self.code in (302, 303):
00469                 new_request.method = "GET"
00470                 new_request.body = None
00471                 for h in ["Content-Length", "Content-Type",
00472                           "Content-Encoding", "Transfer-Encoding"]:
00473                     try:
00474                         del self.request.headers[h]
00475                     except KeyError:
00476                         pass
00477             new_request.original_request = original_request
00478             final_callback = self.final_callback
00479             self.final_callback = None
00480             self._release()
00481             self.client.fetch(new_request, final_callback)
00482             self._on_end_request()
00483             return
00484         if self.request.streaming_callback:
00485             buffer = BytesIO()
00486         else:
00487             buffer = BytesIO(data)  # TODO: don't require one big string?
00488         response = HTTPResponse(original_request,
00489                                 self.code, reason=getattr(self, 'reason', None),
00490                                 headers=self.headers,
00491                                 request_time=self.io_loop.time() - self.start_time,
00492                                 buffer=buffer,
00493                                 effective_url=self.request.url)
00494         self._run_callback(response)
00495         self._on_end_request()
00496 
00497     def _on_end_request(self):
00498         self.stream.close()
00499 
00500     def data_received(self, chunk):
00501         if self.request.streaming_callback is not None:
00502             self.request.streaming_callback(chunk)
00503         else:
00504             self.chunks.append(chunk)
00505 
00506 
00507 if __name__ == "__main__":
00508     AsyncHTTPClient.configure(SimpleAsyncHTTPClient)
00509     main()


rosbridge_tools
Author(s): Jonathan Mace
autogenerated on Sat Dec 27 2014 11:25:59