00001
00002 from __future__ import absolute_import, division, with_statement
00003
00004 from tornado.escape import utf8, _unicode, native_str
00005 from tornado.httpclient import HTTPRequest, HTTPResponse, HTTPError, AsyncHTTPClient, main
00006 from tornado.httputil import HTTPHeaders
00007 from tornado.iostream import IOStream, SSLIOStream
00008 from tornado import stack_context
00009 from tornado.util import b
00010
00011 import base64
00012 import collections
00013 import contextlib
00014 import copy
00015 import functools
00016 import logging
00017 import os.path
00018 import re
00019 import socket
00020 import sys
00021 import time
00022 import urlparse
00023 import zlib
00024
00025 try:
00026 from io import BytesIO
00027 except ImportError:
00028 from cStringIO import StringIO as BytesIO
00029
00030 try:
00031 import ssl
00032 except ImportError:
00033 ssl = None
00034
00035 _DEFAULT_CA_CERTS = os.path.dirname(__file__) + '/ca-certificates.crt'
00036
00037
00038 class SimpleAsyncHTTPClient(AsyncHTTPClient):
00039 """Non-blocking HTTP client with no external dependencies.
00040
00041 This class implements an HTTP 1.1 client on top of Tornado's IOStreams.
00042 It does not currently implement all applicable parts of the HTTP
00043 specification, but it does enough to work with major web service APIs
00044 (mostly tested against the Twitter API so far).
00045
00046 This class has not been tested extensively in production and
00047 should be considered somewhat experimental as of the release of
00048 tornado 1.2. It is intended to become the default AsyncHTTPClient
00049 implementation in a future release. It may either be used
00050 directly, or to facilitate testing of this class with an existing
00051 application, setting the environment variable
00052 USE_SIMPLE_HTTPCLIENT=1 will cause this class to transparently
00053 replace tornado.httpclient.AsyncHTTPClient.
00054
00055 Some features found in the curl-based AsyncHTTPClient are not yet
00056 supported. In particular, proxies are not supported, connections
00057 are not reused, and callers cannot select the network interface to be
00058 used.
00059
00060 Python 2.6 or higher is required for HTTPS support. Users of Python 2.5
00061 should use the curl-based AsyncHTTPClient if HTTPS support is required.
00062
00063 """
00064 def initialize(self, io_loop=None, max_clients=10,
00065 max_simultaneous_connections=None,
00066 hostname_mapping=None, max_buffer_size=104857600):
00067 """Creates a AsyncHTTPClient.
00068
00069 Only a single AsyncHTTPClient instance exists per IOLoop
00070 in order to provide limitations on the number of pending connections.
00071 force_instance=True may be used to suppress this behavior.
00072
00073 max_clients is the number of concurrent requests that can be in
00074 progress. max_simultaneous_connections has no effect and is accepted
00075 only for compatibility with the curl-based AsyncHTTPClient. Note
00076 that these arguments are only used when the client is first created,
00077 and will be ignored when an existing client is reused.
00078
00079 hostname_mapping is a dictionary mapping hostnames to IP addresses.
00080 It can be used to make local DNS changes when modifying system-wide
00081 settings like /etc/hosts is not possible or desirable (e.g. in
00082 unittests).
00083
00084 max_buffer_size is the number of bytes that can be read by IOStream. It
00085 defaults to 100mb.
00086 """
00087 self.io_loop = io_loop
00088 self.max_clients = max_clients
00089 self.queue = collections.deque()
00090 self.active = {}
00091 self.hostname_mapping = hostname_mapping
00092 self.max_buffer_size = max_buffer_size
00093
00094 def fetch(self, request, callback, **kwargs):
00095 if not isinstance(request, HTTPRequest):
00096 request = HTTPRequest(url=request, **kwargs)
00097
00098
00099
00100 request.headers = HTTPHeaders(request.headers)
00101 callback = stack_context.wrap(callback)
00102 self.queue.append((request, callback))
00103 self._process_queue()
00104 if self.queue:
00105 logging.debug("max_clients limit reached, request queued. "
00106 "%d active, %d queued requests." % (
00107 len(self.active), len(self.queue)))
00108
00109 def _process_queue(self):
00110 with stack_context.NullContext():
00111 while self.queue and len(self.active) < self.max_clients:
00112 request, callback = self.queue.popleft()
00113 key = object()
00114 self.active[key] = (request, callback)
00115 _HTTPConnection(self.io_loop, self, request,
00116 functools.partial(self._release_fetch, key),
00117 callback,
00118 self.max_buffer_size)
00119
00120 def _release_fetch(self, key):
00121 del self.active[key]
00122 self._process_queue()
00123
00124
00125 class _HTTPConnection(object):
00126 _SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
00127
00128 def __init__(self, io_loop, client, request, release_callback,
00129 final_callback, max_buffer_size):
00130 self.start_time = time.time()
00131 self.io_loop = io_loop
00132 self.client = client
00133 self.request = request
00134 self.release_callback = release_callback
00135 self.final_callback = final_callback
00136 self.code = None
00137 self.headers = None
00138 self.chunks = None
00139 self._decompressor = None
00140
00141 self._timeout = None
00142 with stack_context.StackContext(self.cleanup):
00143 parsed = urlparse.urlsplit(_unicode(self.request.url))
00144 if ssl is None and parsed.scheme == "https":
00145 raise ValueError("HTTPS requires either python2.6+ or "
00146 "curl_httpclient")
00147 if parsed.scheme not in ("http", "https"):
00148 raise ValueError("Unsupported url scheme: %s" %
00149 self.request.url)
00150
00151
00152 netloc = parsed.netloc
00153 if "@" in netloc:
00154 userpass, _, netloc = netloc.rpartition("@")
00155 match = re.match(r'^(.+):(\d+)$', netloc)
00156 if match:
00157 host = match.group(1)
00158 port = int(match.group(2))
00159 else:
00160 host = netloc
00161 port = 443 if parsed.scheme == "https" else 80
00162 if re.match(r'^\[.*\]$', host):
00163
00164 host = host[1:-1]
00165 parsed_hostname = host
00166 if self.client.hostname_mapping is not None:
00167 host = self.client.hostname_mapping.get(host, host)
00168
00169 if request.allow_ipv6:
00170 af = socket.AF_UNSPEC
00171 else:
00172
00173
00174 af = socket.AF_INET
00175
00176 addrinfo = socket.getaddrinfo(host, port, af, socket.SOCK_STREAM,
00177 0, 0)
00178 af, socktype, proto, canonname, sockaddr = addrinfo[0]
00179
00180 if parsed.scheme == "https":
00181 ssl_options = {}
00182 if request.validate_cert:
00183 ssl_options["cert_reqs"] = ssl.CERT_REQUIRED
00184 if request.ca_certs is not None:
00185 ssl_options["ca_certs"] = request.ca_certs
00186 else:
00187 ssl_options["ca_certs"] = _DEFAULT_CA_CERTS
00188 if request.client_key is not None:
00189 ssl_options["keyfile"] = request.client_key
00190 if request.client_cert is not None:
00191 ssl_options["certfile"] = request.client_cert
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204 if sys.version_info >= (2, 7):
00205 ssl_options["ciphers"] = "DEFAULT:!SSLv2"
00206 else:
00207
00208
00209
00210 ssl_options["ssl_version"] = ssl.PROTOCOL_SSLv3
00211
00212 self.stream = SSLIOStream(socket.socket(af, socktype, proto),
00213 io_loop=self.io_loop,
00214 ssl_options=ssl_options,
00215 max_buffer_size=max_buffer_size)
00216 else:
00217 self.stream = IOStream(socket.socket(af, socktype, proto),
00218 io_loop=self.io_loop,
00219 max_buffer_size=max_buffer_size)
00220 timeout = min(request.connect_timeout, request.request_timeout)
00221 if timeout:
00222 self._timeout = self.io_loop.add_timeout(
00223 self.start_time + timeout,
00224 self._on_timeout)
00225 self.stream.set_close_callback(self._on_close)
00226 self.stream.connect(sockaddr,
00227 functools.partial(self._on_connect, parsed,
00228 parsed_hostname))
00229
00230 def _on_timeout(self):
00231 self._timeout = None
00232 self._run_callback(HTTPResponse(self.request, 599,
00233 request_time=time.time() - self.start_time,
00234 error=HTTPError(599, "Timeout")))
00235 self.stream.close()
00236
00237 def _on_connect(self, parsed, parsed_hostname):
00238 if self._timeout is not None:
00239 self.io_loop.remove_timeout(self._timeout)
00240 self._timeout = None
00241 if self.request.request_timeout:
00242 self._timeout = self.io_loop.add_timeout(
00243 self.start_time + self.request.request_timeout,
00244 self._on_timeout)
00245 if (self.request.validate_cert and
00246 isinstance(self.stream, SSLIOStream)):
00247 match_hostname(self.stream.socket.getpeercert(),
00248
00249
00250
00251
00252 parsed_hostname)
00253 if (self.request.method not in self._SUPPORTED_METHODS and
00254 not self.request.allow_nonstandard_methods):
00255 raise KeyError("unknown method %s" % self.request.method)
00256 for key in ('network_interface',
00257 'proxy_host', 'proxy_port',
00258 'proxy_username', 'proxy_password'):
00259 if getattr(self.request, key, None):
00260 raise NotImplementedError('%s not supported' % key)
00261 if "Connection" not in self.request.headers:
00262 self.request.headers["Connection"] = "close"
00263 if "Host" not in self.request.headers:
00264 if '@' in parsed.netloc:
00265 self.request.headers["Host"] = parsed.netloc.rpartition('@')[-1]
00266 else:
00267 self.request.headers["Host"] = parsed.netloc
00268 username, password = None, None
00269 if parsed.username is not None:
00270 username, password = parsed.username, parsed.password
00271 elif self.request.auth_username is not None:
00272 username = self.request.auth_username
00273 password = self.request.auth_password or ''
00274 if username is not None:
00275 auth = utf8(username) + b(":") + utf8(password)
00276 self.request.headers["Authorization"] = (b("Basic ") +
00277 base64.b64encode(auth))
00278 if self.request.user_agent:
00279 self.request.headers["User-Agent"] = self.request.user_agent
00280 if not self.request.allow_nonstandard_methods:
00281 if self.request.method in ("POST", "PATCH", "PUT"):
00282 assert self.request.body is not None
00283 else:
00284 assert self.request.body is None
00285 if self.request.body is not None:
00286 self.request.headers["Content-Length"] = str(len(
00287 self.request.body))
00288 if (self.request.method == "POST" and
00289 "Content-Type" not in self.request.headers):
00290 self.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
00291 if self.request.use_gzip:
00292 self.request.headers["Accept-Encoding"] = "gzip"
00293 req_path = ((parsed.path or '/') +
00294 (('?' + parsed.query) if parsed.query else ''))
00295 request_lines = [utf8("%s %s HTTP/1.1" % (self.request.method,
00296 req_path))]
00297 for k, v in self.request.headers.get_all():
00298 line = utf8(k) + b(": ") + utf8(v)
00299 if b('\n') in line:
00300 raise ValueError('Newline in header: ' + repr(line))
00301 request_lines.append(line)
00302 self.stream.write(b("\r\n").join(request_lines) + b("\r\n\r\n"))
00303 if self.request.body is not None:
00304 self.stream.write(self.request.body)
00305 self.stream.read_until_regex(b("\r?\n\r?\n"), self._on_headers)
00306
00307 def _release(self):
00308 if self.release_callback is not None:
00309 release_callback = self.release_callback
00310 self.release_callback = None
00311 release_callback()
00312
00313 def _run_callback(self, response):
00314 self._release()
00315 if self.final_callback is not None:
00316 final_callback = self.final_callback
00317 self.final_callback = None
00318 final_callback(response)
00319
00320 @contextlib.contextmanager
00321 def cleanup(self):
00322 try:
00323 yield
00324 except Exception, e:
00325 logging.warning("uncaught exception", exc_info=True)
00326 self._run_callback(HTTPResponse(self.request, 599, error=e,
00327 request_time=time.time() - self.start_time,
00328 ))
00329 if hasattr(self, "stream"):
00330 self.stream.close()
00331
00332 def _on_close(self):
00333 self._run_callback(HTTPResponse(
00334 self.request, 599,
00335 request_time=time.time() - self.start_time,
00336 error=HTTPError(599, "Connection closed")))
00337
00338 def _on_headers(self, data):
00339 data = native_str(data.decode("latin1"))
00340 first_line, _, header_data = data.partition("\n")
00341 match = re.match("HTTP/1.[01] ([0-9]+)", first_line)
00342 assert match
00343 self.code = int(match.group(1))
00344 self.headers = HTTPHeaders.parse(header_data)
00345
00346 if "Content-Length" in self.headers:
00347 if "," in self.headers["Content-Length"]:
00348
00349
00350
00351 pieces = re.split(r',\s*', self.headers["Content-Length"])
00352 if any(i != pieces[0] for i in pieces):
00353 raise ValueError("Multiple unequal Content-Lengths: %r" %
00354 self.headers["Content-Length"])
00355 self.headers["Content-Length"] = pieces[0]
00356 content_length = int(self.headers["Content-Length"])
00357 else:
00358 content_length = None
00359
00360 if self.request.header_callback is not None:
00361 for k, v in self.headers.get_all():
00362 self.request.header_callback("%s: %s\r\n" % (k, v))
00363
00364 if self.request.method == "HEAD":
00365
00366
00367 self._on_body(b(""))
00368 return
00369 if 100 <= self.code < 200 or self.code in (204, 304):
00370
00371
00372 assert "Transfer-Encoding" not in self.headers
00373 assert content_length in (None, 0)
00374 self._on_body(b(""))
00375 return
00376
00377 if (self.request.use_gzip and
00378 self.headers.get("Content-Encoding") == "gzip"):
00379
00380
00381 self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
00382 if self.headers.get("Transfer-Encoding") == "chunked":
00383 self.chunks = []
00384 self.stream.read_until(b("\r\n"), self._on_chunk_length)
00385 elif content_length is not None:
00386 self.stream.read_bytes(content_length, self._on_body)
00387 else:
00388 self.stream.read_until_close(self._on_body)
00389
00390 def _on_body(self, data):
00391 if self._timeout is not None:
00392 self.io_loop.remove_timeout(self._timeout)
00393 self._timeout = None
00394 original_request = getattr(self.request, "original_request",
00395 self.request)
00396 if (self.request.follow_redirects and
00397 self.request.max_redirects > 0 and
00398 self.code in (301, 302, 303, 307)):
00399 new_request = copy.copy(self.request)
00400 new_request.url = urlparse.urljoin(self.request.url,
00401 self.headers["Location"])
00402 new_request.max_redirects -= 1
00403 del new_request.headers["Host"]
00404
00405
00406 if self.code == 303:
00407 new_request.method = "GET"
00408 new_request.body = None
00409 for h in ["Content-Length", "Content-Type",
00410 "Content-Encoding", "Transfer-Encoding"]:
00411 try:
00412 del self.request.headers[h]
00413 except KeyError:
00414 pass
00415 new_request.original_request = original_request
00416 final_callback = self.final_callback
00417 self.final_callback = None
00418 self._release()
00419 self.client.fetch(new_request, final_callback)
00420 self.stream.close()
00421 return
00422 if self._decompressor:
00423 data = self._decompressor.decompress(data)
00424 if self.request.streaming_callback:
00425 if self.chunks is None:
00426
00427
00428 self.request.streaming_callback(data)
00429 buffer = BytesIO()
00430 else:
00431 buffer = BytesIO(data)
00432 response = HTTPResponse(original_request,
00433 self.code, headers=self.headers,
00434 request_time=time.time() - self.start_time,
00435 buffer=buffer,
00436 effective_url=self.request.url)
00437 self._run_callback(response)
00438 self.stream.close()
00439
00440 def _on_chunk_length(self, data):
00441
00442 length = int(data.strip(), 16)
00443 if length == 0:
00444
00445
00446 self._decompressor = None
00447 self._on_body(b('').join(self.chunks))
00448 else:
00449 self.stream.read_bytes(length + 2,
00450 self._on_chunk_data)
00451
00452 def _on_chunk_data(self, data):
00453 assert data[-2:] == b("\r\n")
00454 chunk = data[:-2]
00455 if self._decompressor:
00456 chunk = self._decompressor.decompress(chunk)
00457 if self.request.streaming_callback is not None:
00458 self.request.streaming_callback(chunk)
00459 else:
00460 self.chunks.append(chunk)
00461 self.stream.read_until(b("\r\n"), self._on_chunk_length)
00462
00463
00464
00465
00466
00467 class CertificateError(ValueError):
00468 pass
00469
00470
00471 def _dnsname_to_pat(dn):
00472 pats = []
00473 for frag in dn.split(r'.'):
00474 if frag == '*':
00475
00476
00477 pats.append('[^.]+')
00478 else:
00479
00480 frag = re.escape(frag)
00481 pats.append(frag.replace(r'\*', '[^.]*'))
00482 return re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE)
00483
00484
00485 def match_hostname(cert, hostname):
00486 """Verify that *cert* (in decoded format as returned by
00487 SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 rules
00488 are mostly followed, but IP addresses are not accepted for *hostname*.
00489
00490 CertificateError is raised on failure. On success, the function
00491 returns nothing.
00492 """
00493 if not cert:
00494 raise ValueError("empty or no certificate")
00495 dnsnames = []
00496 san = cert.get('subjectAltName', ())
00497 for key, value in san:
00498 if key == 'DNS':
00499 if _dnsname_to_pat(value).match(hostname):
00500 return
00501 dnsnames.append(value)
00502 if not san:
00503
00504 for sub in cert.get('subject', ()):
00505 for key, value in sub:
00506
00507
00508 if key == 'commonName':
00509 if _dnsname_to_pat(value).match(hostname):
00510 return
00511 dnsnames.append(value)
00512 if len(dnsnames) > 1:
00513 raise CertificateError("hostname %r "
00514 "doesn't match either of %s"
00515 % (hostname, ', '.join(map(repr, dnsnames))))
00516 elif len(dnsnames) == 1:
00517 raise CertificateError("hostname %r "
00518 "doesn't match %r"
00519 % (hostname, dnsnames[0]))
00520 else:
00521 raise CertificateError("no appropriate commonName or "
00522 "subjectAltName fields were found")
00523
00524 if __name__ == "__main__":
00525 AsyncHTTPClient.configure(SimpleAsyncHTTPClient)
00526 main()