00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 """Non-blocking HTTP client implementation using pycurl."""
00018
00019 from __future__ import absolute_import, division, print_function, with_statement
00020
00021 import collections
00022 import logging
00023 import pycurl
00024 import threading
00025 import time
00026
00027 from tornado import httputil
00028 from tornado import ioloop
00029 from tornado.log import gen_log
00030 from tornado import stack_context
00031
00032 from tornado.escape import utf8, native_str
00033 from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main
00034 from tornado.util import bytes_type
00035
00036 try:
00037 from io import BytesIO
00038 except ImportError:
00039 from cStringIO import StringIO as BytesIO
00040
00041
00042 class CurlAsyncHTTPClient(AsyncHTTPClient):
00043 def initialize(self, io_loop, max_clients=10, defaults=None):
00044 super(CurlAsyncHTTPClient, self).initialize(io_loop, defaults=defaults)
00045 self._multi = pycurl.CurlMulti()
00046 self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
00047 self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
00048 self._curls = [_curl_create() for i in range(max_clients)]
00049 self._free_list = self._curls[:]
00050 self._requests = collections.deque()
00051 self._fds = {}
00052 self._timeout = None
00053
00054
00055
00056
00057
00058 self._force_timeout_callback = ioloop.PeriodicCallback(
00059 self._handle_force_timeout, 1000, io_loop=io_loop)
00060 self._force_timeout_callback.start()
00061
00062
00063
00064
00065
00066
00067 dummy_curl_handle = pycurl.Curl()
00068 self._multi.add_handle(dummy_curl_handle)
00069 self._multi.remove_handle(dummy_curl_handle)
00070
00071 def close(self):
00072 self._force_timeout_callback.stop()
00073 if self._timeout is not None:
00074 self.io_loop.remove_timeout(self._timeout)
00075 for curl in self._curls:
00076 curl.close()
00077 self._multi.close()
00078 super(CurlAsyncHTTPClient, self).close()
00079
00080 def fetch_impl(self, request, callback):
00081 self._requests.append((request, callback))
00082 self._process_queue()
00083 self._set_timeout(0)
00084
00085 def _handle_socket(self, event, fd, multi, data):
00086 """Called by libcurl when it wants to change the file descriptors
00087 it cares about.
00088 """
00089 event_map = {
00090 pycurl.POLL_NONE: ioloop.IOLoop.NONE,
00091 pycurl.POLL_IN: ioloop.IOLoop.READ,
00092 pycurl.POLL_OUT: ioloop.IOLoop.WRITE,
00093 pycurl.POLL_INOUT: ioloop.IOLoop.READ | ioloop.IOLoop.WRITE
00094 }
00095 if event == pycurl.POLL_REMOVE:
00096 if fd in self._fds:
00097 self.io_loop.remove_handler(fd)
00098 del self._fds[fd]
00099 else:
00100 ioloop_event = event_map[event]
00101
00102
00103
00104
00105
00106
00107
00108
00109 if fd in self._fds:
00110 self.io_loop.remove_handler(fd)
00111 self.io_loop.add_handler(fd, self._handle_events,
00112 ioloop_event)
00113 self._fds[fd] = ioloop_event
00114
00115 def _set_timeout(self, msecs):
00116 """Called by libcurl to schedule a timeout."""
00117 if self._timeout is not None:
00118 self.io_loop.remove_timeout(self._timeout)
00119 self._timeout = self.io_loop.add_timeout(
00120 self.io_loop.time() + msecs / 1000.0, self._handle_timeout)
00121
00122 def _handle_events(self, fd, events):
00123 """Called by IOLoop when there is activity on one of our
00124 file descriptors.
00125 """
00126 action = 0
00127 if events & ioloop.IOLoop.READ:
00128 action |= pycurl.CSELECT_IN
00129 if events & ioloop.IOLoop.WRITE:
00130 action |= pycurl.CSELECT_OUT
00131 while True:
00132 try:
00133 ret, num_handles = self._multi.socket_action(fd, action)
00134 except pycurl.error as e:
00135 ret = e.args[0]
00136 if ret != pycurl.E_CALL_MULTI_PERFORM:
00137 break
00138 self._finish_pending_requests()
00139
00140 def _handle_timeout(self):
00141 """Called by IOLoop when the requested timeout has passed."""
00142 with stack_context.NullContext():
00143 self._timeout = None
00144 while True:
00145 try:
00146 ret, num_handles = self._multi.socket_action(
00147 pycurl.SOCKET_TIMEOUT, 0)
00148 except pycurl.error as e:
00149 ret = e.args[0]
00150 if ret != pycurl.E_CALL_MULTI_PERFORM:
00151 break
00152 self._finish_pending_requests()
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167 new_timeout = self._multi.timeout()
00168 if new_timeout >= 0:
00169 self._set_timeout(new_timeout)
00170
00171 def _handle_force_timeout(self):
00172 """Called by IOLoop periodically to ask libcurl to process any
00173 events it may have forgotten about.
00174 """
00175 with stack_context.NullContext():
00176 while True:
00177 try:
00178 ret, num_handles = self._multi.socket_all()
00179 except pycurl.error as e:
00180 ret = e.args[0]
00181 if ret != pycurl.E_CALL_MULTI_PERFORM:
00182 break
00183 self._finish_pending_requests()
00184
00185 def _finish_pending_requests(self):
00186 """Process any requests that were completed by the last
00187 call to multi.socket_action.
00188 """
00189 while True:
00190 num_q, ok_list, err_list = self._multi.info_read()
00191 for curl in ok_list:
00192 self._finish(curl)
00193 for curl, errnum, errmsg in err_list:
00194 self._finish(curl, errnum, errmsg)
00195 if num_q == 0:
00196 break
00197 self._process_queue()
00198
00199 def _process_queue(self):
00200 with stack_context.NullContext():
00201 while True:
00202 started = 0
00203 while self._free_list and self._requests:
00204 started += 1
00205 curl = self._free_list.pop()
00206 (request, callback) = self._requests.popleft()
00207 curl.info = {
00208 "headers": httputil.HTTPHeaders(),
00209 "buffer": BytesIO(),
00210 "request": request,
00211 "callback": callback,
00212 "curl_start_time": time.time(),
00213 }
00214 _curl_setup_request(curl, request, curl.info["buffer"],
00215 curl.info["headers"])
00216 self._multi.add_handle(curl)
00217
00218 if not started:
00219 break
00220
00221 def _finish(self, curl, curl_error=None, curl_message=None):
00222 info = curl.info
00223 curl.info = None
00224 self._multi.remove_handle(curl)
00225 self._free_list.append(curl)
00226 buffer = info["buffer"]
00227 if curl_error:
00228 error = CurlError(curl_error, curl_message)
00229 code = error.code
00230 effective_url = None
00231 buffer.close()
00232 buffer = None
00233 else:
00234 error = None
00235 code = curl.getinfo(pycurl.HTTP_CODE)
00236 effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
00237 buffer.seek(0)
00238
00239
00240 time_info = dict(
00241 queue=info["curl_start_time"] - info["request"].start_time,
00242 namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME),
00243 connect=curl.getinfo(pycurl.CONNECT_TIME),
00244 pretransfer=curl.getinfo(pycurl.PRETRANSFER_TIME),
00245 starttransfer=curl.getinfo(pycurl.STARTTRANSFER_TIME),
00246 total=curl.getinfo(pycurl.TOTAL_TIME),
00247 redirect=curl.getinfo(pycurl.REDIRECT_TIME),
00248 )
00249 try:
00250 info["callback"](HTTPResponse(
00251 request=info["request"], code=code, headers=info["headers"],
00252 buffer=buffer, effective_url=effective_url, error=error,
00253 reason=info['headers'].get("X-Http-Reason", None),
00254 request_time=time.time() - info["curl_start_time"],
00255 time_info=time_info))
00256 except Exception:
00257 self.handle_callback_exception(info["callback"])
00258
00259 def handle_callback_exception(self, callback):
00260 self.io_loop.handle_callback_exception(callback)
00261
00262
00263 class CurlError(HTTPError):
00264 def __init__(self, errno, message):
00265 HTTPError.__init__(self, 599, message)
00266 self.errno = errno
00267
00268
00269 def _curl_create():
00270 curl = pycurl.Curl()
00271 if gen_log.isEnabledFor(logging.DEBUG):
00272 curl.setopt(pycurl.VERBOSE, 1)
00273 curl.setopt(pycurl.DEBUGFUNCTION, _curl_debug)
00274 return curl
00275
00276
00277 def _curl_setup_request(curl, request, buffer, headers):
00278 curl.setopt(pycurl.URL, native_str(request.url))
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288 if "Expect" not in request.headers:
00289 request.headers["Expect"] = ""
00290
00291
00292 if "Pragma" not in request.headers:
00293 request.headers["Pragma"] = ""
00294
00295
00296 if isinstance(request.headers, httputil.HTTPHeaders):
00297 curl.setopt(pycurl.HTTPHEADER,
00298 [native_str("%s: %s" % i) for i in request.headers.get_all()])
00299 else:
00300 curl.setopt(pycurl.HTTPHEADER,
00301 [native_str("%s: %s" % i) for i in request.headers.items()])
00302
00303 if request.header_callback:
00304 curl.setopt(pycurl.HEADERFUNCTION,
00305 lambda line: request.header_callback(native_str(line)))
00306 else:
00307 curl.setopt(pycurl.HEADERFUNCTION,
00308 lambda line: _curl_header_callback(headers,
00309 native_str(line)))
00310 if request.streaming_callback:
00311 write_function = request.streaming_callback
00312 else:
00313 write_function = buffer.write
00314 if bytes_type is str:
00315 curl.setopt(pycurl.WRITEFUNCTION, write_function)
00316 else:
00317
00318
00319
00320
00321
00322
00323
00324 curl.setopt(pycurl.WRITEFUNCTION, lambda s: write_function(utf8(s)))
00325 curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects)
00326 curl.setopt(pycurl.MAXREDIRS, request.max_redirects)
00327 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(1000 * request.connect_timeout))
00328 curl.setopt(pycurl.TIMEOUT_MS, int(1000 * request.request_timeout))
00329 if request.user_agent:
00330 curl.setopt(pycurl.USERAGENT, native_str(request.user_agent))
00331 else:
00332 curl.setopt(pycurl.USERAGENT, "Mozilla/5.0 (compatible; pycurl)")
00333 if request.network_interface:
00334 curl.setopt(pycurl.INTERFACE, request.network_interface)
00335 if request.decompress_response:
00336 curl.setopt(pycurl.ENCODING, "gzip,deflate")
00337 else:
00338 curl.setopt(pycurl.ENCODING, "none")
00339 if request.proxy_host and request.proxy_port:
00340 curl.setopt(pycurl.PROXY, request.proxy_host)
00341 curl.setopt(pycurl.PROXYPORT, request.proxy_port)
00342 if request.proxy_username:
00343 credentials = '%s:%s' % (request.proxy_username,
00344 request.proxy_password)
00345 curl.setopt(pycurl.PROXYUSERPWD, credentials)
00346 else:
00347 curl.setopt(pycurl.PROXY, '')
00348 curl.unsetopt(pycurl.PROXYUSERPWD)
00349 if request.validate_cert:
00350 curl.setopt(pycurl.SSL_VERIFYPEER, 1)
00351 curl.setopt(pycurl.SSL_VERIFYHOST, 2)
00352 else:
00353 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
00354 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
00355 if request.ca_certs is not None:
00356 curl.setopt(pycurl.CAINFO, request.ca_certs)
00357 else:
00358
00359
00360
00361
00362
00363
00364 pass
00365
00366 if request.allow_ipv6 is False:
00367
00368
00369 curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
00370 else:
00371 curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER)
00372
00373
00374
00375 curl_options = {
00376 "GET": pycurl.HTTPGET,
00377 "POST": pycurl.POST,
00378 "PUT": pycurl.UPLOAD,
00379 "HEAD": pycurl.NOBODY,
00380 }
00381 custom_methods = set(["DELETE", "OPTIONS", "PATCH"])
00382 for o in curl_options.values():
00383 curl.setopt(o, False)
00384 if request.method in curl_options:
00385 curl.unsetopt(pycurl.CUSTOMREQUEST)
00386 curl.setopt(curl_options[request.method], True)
00387 elif request.allow_nonstandard_methods or request.method in custom_methods:
00388 curl.setopt(pycurl.CUSTOMREQUEST, request.method)
00389 else:
00390 raise KeyError('unknown method ' + request.method)
00391
00392
00393 if request.method in ("POST", "PUT"):
00394 if request.body is None:
00395 raise AssertionError(
00396 'Body must not be empty for "%s" request'
00397 % request.method)
00398
00399 request_buffer = BytesIO(utf8(request.body))
00400 curl.setopt(pycurl.READFUNCTION, request_buffer.read)
00401 if request.method == "POST":
00402 def ioctl(cmd):
00403 if cmd == curl.IOCMD_RESTARTREAD:
00404 request_buffer.seek(0)
00405 curl.setopt(pycurl.IOCTLFUNCTION, ioctl)
00406 curl.setopt(pycurl.POSTFIELDSIZE, len(request.body))
00407 else:
00408 curl.setopt(pycurl.INFILESIZE, len(request.body))
00409 elif request.method == "GET":
00410 if request.body is not None:
00411 raise AssertionError('Body must be empty for GET request')
00412
00413 if request.auth_username is not None:
00414 userpwd = "%s:%s" % (request.auth_username, request.auth_password or '')
00415
00416 if request.auth_mode is None or request.auth_mode == "basic":
00417 curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
00418 elif request.auth_mode == "digest":
00419 curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_DIGEST)
00420 else:
00421 raise ValueError("Unsupported auth_mode %s" % request.auth_mode)
00422
00423 curl.setopt(pycurl.USERPWD, native_str(userpwd))
00424 gen_log.debug("%s %s (username: %r)", request.method, request.url,
00425 request.auth_username)
00426 else:
00427 curl.unsetopt(pycurl.USERPWD)
00428 gen_log.debug("%s %s", request.method, request.url)
00429
00430 if request.client_cert is not None:
00431 curl.setopt(pycurl.SSLCERT, request.client_cert)
00432
00433 if request.client_key is not None:
00434 curl.setopt(pycurl.SSLKEY, request.client_key)
00435
00436 if threading.activeCount() > 1:
00437
00438
00439
00440
00441
00442
00443
00444
00445 curl.setopt(pycurl.NOSIGNAL, 1)
00446 if request.prepare_curl_callback is not None:
00447 request.prepare_curl_callback(curl)
00448
00449
00450 def _curl_header_callback(headers, header_line):
00451
00452 header_line = header_line.strip()
00453 if header_line.startswith("HTTP/"):
00454 headers.clear()
00455 try:
00456 (__, __, reason) = httputil.parse_response_start_line(header_line)
00457 header_line = "X-Http-Reason: %s" % reason
00458 except httputil.HTTPInputError:
00459 return
00460 if not header_line:
00461 return
00462 headers.parse_line(header_line)
00463
00464
00465 def _curl_debug(debug_type, debug_msg):
00466 debug_types = ('I', '<', '>', '<', '>')
00467 if debug_type == 0:
00468 gen_log.debug('%s', debug_msg.strip())
00469 elif debug_type in (1, 2):
00470 for line in debug_msg.splitlines():
00471 gen_log.debug('%s %s', debug_types[debug_type], line)
00472 elif debug_type == 4:
00473 gen_log.debug('%s %r', debug_types[debug_type], debug_msg)
00474
00475 if __name__ == "__main__":
00476 AsyncHTTPClient.configure(CurlAsyncHTTPClient)
00477 main()