00001
00002 from __future__ import absolute_import, division, print_function, with_statement
00003
00004 from tornado.concurrent import is_future
00005 from tornado.escape import utf8, _unicode
00006 from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main, _RequestProxy
00007 from tornado import httputil
00008 from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters
00009 from tornado.iostream import StreamClosedError
00010 from tornado.netutil import Resolver, OverrideResolver
00011 from tornado.log import gen_log
00012 from tornado import stack_context
00013 from tornado.tcpclient import TCPClient
00014
00015 import base64
00016 import collections
00017 import copy
00018 import functools
00019 import re
00020 import socket
00021 import sys
00022
00023 try:
00024 from io import BytesIO
00025 except ImportError:
00026 from cStringIO import StringIO as BytesIO
00027
00028 try:
00029 import urlparse
00030 except ImportError:
00031 import urllib.parse as urlparse
00032
00033 try:
00034 import ssl
00035 except ImportError:
00036
00037 ssl = None
00038
00039 try:
00040 import certifi
00041 except ImportError:
00042 certifi = None
00043
00044
00045 def _default_ca_certs():
00046 if certifi is None:
00047 raise Exception("The 'certifi' package is required to use https "
00048 "in simple_httpclient")
00049 return certifi.where()
00050
00051
00052 class SimpleAsyncHTTPClient(AsyncHTTPClient):
00053 """Non-blocking HTTP client with no external dependencies.
00054
00055 This class implements an HTTP 1.1 client on top of Tornado's IOStreams.
00056 It does not currently implement all applicable parts of the HTTP
00057 specification, but it does enough to work with major web service APIs.
00058
00059 Some features found in the curl-based AsyncHTTPClient are not yet
00060 supported. In particular, proxies are not supported, connections
00061 are not reused, and callers cannot select the network interface to be
00062 used.
00063 """
00064 def initialize(self, io_loop, max_clients=10,
00065 hostname_mapping=None, max_buffer_size=104857600,
00066 resolver=None, defaults=None, max_header_size=None):
00067 """Creates a AsyncHTTPClient.
00068
00069 Only a single AsyncHTTPClient instance exists per IOLoop
00070 in order to provide limitations on the number of pending connections.
00071 force_instance=True may be used to suppress this behavior.
00072
00073 max_clients is the number of concurrent requests that can be
00074 in progress. Note that this arguments are only used when the
00075 client is first created, and will be ignored when an existing
00076 client is reused.
00077
00078 hostname_mapping is a dictionary mapping hostnames to IP addresses.
00079 It can be used to make local DNS changes when modifying system-wide
00080 settings like /etc/hosts is not possible or desirable (e.g. in
00081 unittests).
00082
00083 max_buffer_size is the number of bytes that can be read by IOStream. It
00084 defaults to 100mb.
00085 """
00086 super(SimpleAsyncHTTPClient, self).initialize(io_loop,
00087 defaults=defaults)
00088 self.max_clients = max_clients
00089 self.queue = collections.deque()
00090 self.active = {}
00091 self.waiting = {}
00092 self.max_buffer_size = max_buffer_size
00093 self.max_header_size = max_header_size
00094
00095
00096 if resolver:
00097 self.resolver = resolver
00098 self.own_resolver = False
00099 else:
00100 self.resolver = Resolver(io_loop=io_loop)
00101 self.own_resolver = True
00102 if hostname_mapping is not None:
00103 self.resolver = OverrideResolver(resolver=self.resolver,
00104 mapping=hostname_mapping)
00105 self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)
00106
00107 def close(self):
00108 super(SimpleAsyncHTTPClient, self).close()
00109 if self.own_resolver:
00110 self.resolver.close()
00111 self.tcp_client.close()
00112
00113 def fetch_impl(self, request, callback):
00114 key = object()
00115 self.queue.append((key, request, callback))
00116 if not len(self.active) < self.max_clients:
00117 timeout_handle = self.io_loop.add_timeout(
00118 self.io_loop.time() + min(request.connect_timeout,
00119 request.request_timeout),
00120 functools.partial(self._on_timeout, key))
00121 else:
00122 timeout_handle = None
00123 self.waiting[key] = (request, callback, timeout_handle)
00124 self._process_queue()
00125 if self.queue:
00126 gen_log.debug("max_clients limit reached, request queued. "
00127 "%d active, %d queued requests." % (
00128 len(self.active), len(self.queue)))
00129
00130 def _process_queue(self):
00131 with stack_context.NullContext():
00132 while self.queue and len(self.active) < self.max_clients:
00133 key, request, callback = self.queue.popleft()
00134 if key not in self.waiting:
00135 continue
00136 self._remove_timeout(key)
00137 self.active[key] = (request, callback)
00138 release_callback = functools.partial(self._release_fetch, key)
00139 self._handle_request(request, release_callback, callback)
00140
00141 def _handle_request(self, request, release_callback, final_callback):
00142 _HTTPConnection(self.io_loop, self, request, release_callback,
00143 final_callback, self.max_buffer_size, self.tcp_client,
00144 self.max_header_size)
00145
00146 def _release_fetch(self, key):
00147 del self.active[key]
00148 self._process_queue()
00149
00150 def _remove_timeout(self, key):
00151 if key in self.waiting:
00152 request, callback, timeout_handle = self.waiting[key]
00153 if timeout_handle is not None:
00154 self.io_loop.remove_timeout(timeout_handle)
00155 del self.waiting[key]
00156
00157 def _on_timeout(self, key):
00158 request, callback, timeout_handle = self.waiting[key]
00159 self.queue.remove((key, request, callback))
00160 timeout_response = HTTPResponse(
00161 request, 599, error=HTTPError(599, "Timeout"),
00162 request_time=self.io_loop.time() - request.start_time)
00163 self.io_loop.add_callback(callback, timeout_response)
00164 del self.waiting[key]
00165
00166
00167 class _HTTPConnection(httputil.HTTPMessageDelegate):
00168 _SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
00169
00170 def __init__(self, io_loop, client, request, release_callback,
00171 final_callback, max_buffer_size, tcp_client,
00172 max_header_size):
00173 self.start_time = io_loop.time()
00174 self.io_loop = io_loop
00175 self.client = client
00176 self.request = request
00177 self.release_callback = release_callback
00178 self.final_callback = final_callback
00179 self.max_buffer_size = max_buffer_size
00180 self.tcp_client = tcp_client
00181 self.max_header_size = max_header_size
00182 self.code = None
00183 self.headers = None
00184 self.chunks = []
00185 self._decompressor = None
00186
00187 self._timeout = None
00188 self._sockaddr = None
00189 with stack_context.ExceptionStackContext(self._handle_exception):
00190 self.parsed = urlparse.urlsplit(_unicode(self.request.url))
00191 if self.parsed.scheme not in ("http", "https"):
00192 raise ValueError("Unsupported url scheme: %s" %
00193 self.request.url)
00194
00195
00196 netloc = self.parsed.netloc
00197 if "@" in netloc:
00198 userpass, _, netloc = netloc.rpartition("@")
00199 match = re.match(r'^(.+):(\d+)$', netloc)
00200 if match:
00201 host = match.group(1)
00202 port = int(match.group(2))
00203 else:
00204 host = netloc
00205 port = 443 if self.parsed.scheme == "https" else 80
00206 if re.match(r'^\[.*\]$', host):
00207
00208 host = host[1:-1]
00209 self.parsed_hostname = host
00210
00211 if request.allow_ipv6 is False:
00212 af = socket.AF_INET
00213 else:
00214 af = socket.AF_UNSPEC
00215
00216 ssl_options = self._get_ssl_options(self.parsed.scheme)
00217
00218 timeout = min(self.request.connect_timeout, self.request.request_timeout)
00219 if timeout:
00220 self._timeout = self.io_loop.add_timeout(
00221 self.start_time + timeout,
00222 stack_context.wrap(self._on_timeout))
00223 self.tcp_client.connect(host, port, af=af,
00224 ssl_options=ssl_options,
00225 max_buffer_size=self.max_buffer_size,
00226 callback=self._on_connect)
00227
00228 def _get_ssl_options(self, scheme):
00229 if scheme == "https":
00230 ssl_options = {}
00231 if self.request.validate_cert:
00232 ssl_options["cert_reqs"] = ssl.CERT_REQUIRED
00233 if self.request.ca_certs is not None:
00234 ssl_options["ca_certs"] = self.request.ca_certs
00235 else:
00236 ssl_options["ca_certs"] = _default_ca_certs()
00237 if self.request.client_key is not None:
00238 ssl_options["keyfile"] = self.request.client_key
00239 if self.request.client_cert is not None:
00240 ssl_options["certfile"] = self.request.client_cert
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253 if sys.version_info >= (2, 7):
00254
00255
00256 ssl_options["ciphers"] = "DEFAULT:!SSLv2:!EXPORT:!DES"
00257 else:
00258
00259
00260
00261 ssl_options["ssl_version"] = ssl.PROTOCOL_TLSv1
00262 return ssl_options
00263 return None
00264
00265 def _on_timeout(self):
00266 self._timeout = None
00267 if self.final_callback is not None:
00268 raise HTTPError(599, "Timeout")
00269
00270 def _remove_timeout(self):
00271 if self._timeout is not None:
00272 self.io_loop.remove_timeout(self._timeout)
00273 self._timeout = None
00274
00275 def _on_connect(self, stream):
00276 if self.final_callback is None:
00277
00278 stream.close()
00279 return
00280 self.stream = stream
00281 self.stream.set_close_callback(self.on_connection_close)
00282 self._remove_timeout()
00283 if self.final_callback is None:
00284 return
00285 if self.request.request_timeout:
00286 self._timeout = self.io_loop.add_timeout(
00287 self.start_time + self.request.request_timeout,
00288 stack_context.wrap(self._on_timeout))
00289 if (self.request.method not in self._SUPPORTED_METHODS and
00290 not self.request.allow_nonstandard_methods):
00291 raise KeyError("unknown method %s" % self.request.method)
00292 for key in ('network_interface',
00293 'proxy_host', 'proxy_port',
00294 'proxy_username', 'proxy_password'):
00295 if getattr(self.request, key, None):
00296 raise NotImplementedError('%s not supported' % key)
00297 if "Connection" not in self.request.headers:
00298 self.request.headers["Connection"] = "close"
00299 if "Host" not in self.request.headers:
00300 if '@' in self.parsed.netloc:
00301 self.request.headers["Host"] = self.parsed.netloc.rpartition('@')[-1]
00302 else:
00303 self.request.headers["Host"] = self.parsed.netloc
00304 username, password = None, None
00305 if self.parsed.username is not None:
00306 username, password = self.parsed.username, self.parsed.password
00307 elif self.request.auth_username is not None:
00308 username = self.request.auth_username
00309 password = self.request.auth_password or ''
00310 if username is not None:
00311 if self.request.auth_mode not in (None, "basic"):
00312 raise ValueError("unsupported auth_mode %s",
00313 self.request.auth_mode)
00314 auth = utf8(username) + b":" + utf8(password)
00315 self.request.headers["Authorization"] = (b"Basic " +
00316 base64.b64encode(auth))
00317 if self.request.user_agent:
00318 self.request.headers["User-Agent"] = self.request.user_agent
00319 if not self.request.allow_nonstandard_methods:
00320 if self.request.method in ("POST", "PATCH", "PUT"):
00321 if (self.request.body is None and
00322 self.request.body_producer is None):
00323 raise AssertionError(
00324 'Body must not be empty for "%s" request'
00325 % self.request.method)
00326 else:
00327 if (self.request.body is not None or
00328 self.request.body_producer is not None):
00329 raise AssertionError(
00330 'Body must be empty for "%s" request'
00331 % self.request.method)
00332 if self.request.expect_100_continue:
00333 self.request.headers["Expect"] = "100-continue"
00334 if self.request.body is not None:
00335
00336
00337 self.request.headers["Content-Length"] = str(len(
00338 self.request.body))
00339 if (self.request.method == "POST" and
00340 "Content-Type" not in self.request.headers):
00341 self.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
00342 if self.request.decompress_response:
00343 self.request.headers["Accept-Encoding"] = "gzip"
00344 req_path = ((self.parsed.path or '/') +
00345 (('?' + self.parsed.query) if self.parsed.query else ''))
00346 self.stream.set_nodelay(True)
00347 self.connection = HTTP1Connection(
00348 self.stream, True,
00349 HTTP1ConnectionParameters(
00350 no_keep_alive=True,
00351 max_header_size=self.max_header_size,
00352 decompress=self.request.decompress_response),
00353 self._sockaddr)
00354 start_line = httputil.RequestStartLine(self.request.method,
00355 req_path, 'HTTP/1.1')
00356 self.connection.write_headers(start_line, self.request.headers)
00357 if self.request.expect_100_continue:
00358 self._read_response()
00359 else:
00360 self._write_body(True)
00361
00362 def _write_body(self, start_read):
00363 if self.request.body is not None:
00364 self.connection.write(self.request.body)
00365 self.connection.finish()
00366 elif self.request.body_producer is not None:
00367 fut = self.request.body_producer(self.connection.write)
00368 if is_future(fut):
00369 def on_body_written(fut):
00370 fut.result()
00371 self.connection.finish()
00372 if start_read:
00373 self._read_response()
00374 self.io_loop.add_future(fut, on_body_written)
00375 return
00376 self.connection.finish()
00377 if start_read:
00378 self._read_response()
00379
00380 def _read_response(self):
00381
00382
00383 self.io_loop.add_future(
00384 self.connection.read_response(self),
00385 lambda f: f.result())
00386
00387 def _release(self):
00388 if self.release_callback is not None:
00389 release_callback = self.release_callback
00390 self.release_callback = None
00391 release_callback()
00392
00393 def _run_callback(self, response):
00394 self._release()
00395 if self.final_callback is not None:
00396 final_callback = self.final_callback
00397 self.final_callback = None
00398 self.io_loop.add_callback(final_callback, response)
00399
00400 def _handle_exception(self, typ, value, tb):
00401 if self.final_callback:
00402 self._remove_timeout()
00403 if isinstance(value, StreamClosedError):
00404 value = HTTPError(599, "Stream closed")
00405 self._run_callback(HTTPResponse(self.request, 599, error=value,
00406 request_time=self.io_loop.time() - self.start_time,
00407 ))
00408
00409 if hasattr(self, "stream"):
00410
00411
00412
00413 self.stream.close()
00414 return True
00415 else:
00416
00417
00418
00419
00420 return isinstance(value, StreamClosedError)
00421
00422 def on_connection_close(self):
00423 if self.final_callback is not None:
00424 message = "Connection closed"
00425 if self.stream.error:
00426 raise self.stream.error
00427 try:
00428 raise HTTPError(599, message)
00429 except HTTPError:
00430 self._handle_exception(*sys.exc_info())
00431
00432 def headers_received(self, first_line, headers):
00433 if self.request.expect_100_continue and first_line.code == 100:
00434 self._write_body(False)
00435 return
00436 self.headers = headers
00437 self.code = first_line.code
00438 self.reason = first_line.reason
00439
00440 if self.request.header_callback is not None:
00441
00442 self.request.header_callback('%s %s %s\r\n' % first_line)
00443 for k, v in self.headers.get_all():
00444 self.request.header_callback("%s: %s\r\n" % (k, v))
00445 self.request.header_callback('\r\n')
00446
00447 def finish(self):
00448 data = b''.join(self.chunks)
00449 self._remove_timeout()
00450 original_request = getattr(self.request, "original_request",
00451 self.request)
00452 if (self.request.follow_redirects and
00453 self.request.max_redirects > 0 and
00454 self.code in (301, 302, 303, 307)):
00455 assert isinstance(self.request, _RequestProxy)
00456 new_request = copy.copy(self.request.request)
00457 new_request.url = urlparse.urljoin(self.request.url,
00458 self.headers["Location"])
00459 new_request.max_redirects = self.request.max_redirects - 1
00460 del new_request.headers["Host"]
00461
00462
00463
00464
00465
00466
00467
00468 if self.code in (302, 303):
00469 new_request.method = "GET"
00470 new_request.body = None
00471 for h in ["Content-Length", "Content-Type",
00472 "Content-Encoding", "Transfer-Encoding"]:
00473 try:
00474 del self.request.headers[h]
00475 except KeyError:
00476 pass
00477 new_request.original_request = original_request
00478 final_callback = self.final_callback
00479 self.final_callback = None
00480 self._release()
00481 self.client.fetch(new_request, final_callback)
00482 self._on_end_request()
00483 return
00484 if self.request.streaming_callback:
00485 buffer = BytesIO()
00486 else:
00487 buffer = BytesIO(data)
00488 response = HTTPResponse(original_request,
00489 self.code, reason=getattr(self, 'reason', None),
00490 headers=self.headers,
00491 request_time=self.io_loop.time() - self.start_time,
00492 buffer=buffer,
00493 effective_url=self.request.url)
00494 self._run_callback(response)
00495 self._on_end_request()
00496
00497 def _on_end_request(self):
00498 self.stream.close()
00499
00500 def data_received(self, chunk):
00501 if self.request.streaming_callback is not None:
00502 self.request.streaming_callback(chunk)
00503 else:
00504 self.chunks.append(chunk)
00505
00506
00507 if __name__ == "__main__":
00508 AsyncHTTPClient.configure(SimpleAsyncHTTPClient)
00509 main()