00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 """Blocking and non-blocking HTTP client implementations using pycurl."""
00018
00019 from __future__ import absolute_import, division, with_statement
00020
00021 import cStringIO
00022 import collections
00023 import logging
00024 import pycurl
00025 import threading
00026 import time
00027
00028 from tornado import httputil
00029 from tornado import ioloop
00030 from tornado import stack_context
00031
00032 from tornado.escape import utf8
00033 from tornado.httpclient import HTTPRequest, HTTPResponse, HTTPError, AsyncHTTPClient, main
00034
00035
00036 class CurlAsyncHTTPClient(AsyncHTTPClient):
00037 def initialize(self, io_loop=None, max_clients=10,
00038 max_simultaneous_connections=None):
00039 self.io_loop = io_loop
00040 self._multi = pycurl.CurlMulti()
00041 self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
00042 self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
00043 self._curls = [_curl_create(max_simultaneous_connections)
00044 for i in xrange(max_clients)]
00045 self._free_list = self._curls[:]
00046 self._requests = collections.deque()
00047 self._fds = {}
00048 self._timeout = None
00049
00050 try:
00051 self._socket_action = self._multi.socket_action
00052 except AttributeError:
00053
00054
00055
00056 logging.warning("socket_action method missing from pycurl; "
00057 "falling back to socket_all. Upgrading "
00058 "libcurl and pycurl will improve performance")
00059 self._socket_action = \
00060 lambda fd, action: self._multi.socket_all()
00061
00062
00063
00064
00065
00066 self._force_timeout_callback = ioloop.PeriodicCallback(
00067 self._handle_force_timeout, 1000, io_loop=io_loop)
00068 self._force_timeout_callback.start()
00069
00070 def close(self):
00071 self._force_timeout_callback.stop()
00072 for curl in self._curls:
00073 curl.close()
00074 self._multi.close()
00075 self._closed = True
00076 super(CurlAsyncHTTPClient, self).close()
00077
00078 def fetch(self, request, callback, **kwargs):
00079 if not isinstance(request, HTTPRequest):
00080 request = HTTPRequest(url=request, **kwargs)
00081 self._requests.append((request, stack_context.wrap(callback)))
00082 self._process_queue()
00083 self._set_timeout(0)
00084
00085 def _handle_socket(self, event, fd, multi, data):
00086 """Called by libcurl when it wants to change the file descriptors
00087 it cares about.
00088 """
00089 event_map = {
00090 pycurl.POLL_NONE: ioloop.IOLoop.NONE,
00091 pycurl.POLL_IN: ioloop.IOLoop.READ,
00092 pycurl.POLL_OUT: ioloop.IOLoop.WRITE,
00093 pycurl.POLL_INOUT: ioloop.IOLoop.READ | ioloop.IOLoop.WRITE
00094 }
00095 if event == pycurl.POLL_REMOVE:
00096 self.io_loop.remove_handler(fd)
00097 del self._fds[fd]
00098 else:
00099 ioloop_event = event_map[event]
00100 if fd not in self._fds:
00101 self._fds[fd] = ioloop_event
00102 self.io_loop.add_handler(fd, self._handle_events,
00103 ioloop_event)
00104 else:
00105 self._fds[fd] = ioloop_event
00106 self.io_loop.update_handler(fd, ioloop_event)
00107
00108 def _set_timeout(self, msecs):
00109 """Called by libcurl to schedule a timeout."""
00110 if self._timeout is not None:
00111 self.io_loop.remove_timeout(self._timeout)
00112 self._timeout = self.io_loop.add_timeout(
00113 time.time() + msecs / 1000.0, self._handle_timeout)
00114
00115 def _handle_events(self, fd, events):
00116 """Called by IOLoop when there is activity on one of our
00117 file descriptors.
00118 """
00119 action = 0
00120 if events & ioloop.IOLoop.READ:
00121 action |= pycurl.CSELECT_IN
00122 if events & ioloop.IOLoop.WRITE:
00123 action |= pycurl.CSELECT_OUT
00124 while True:
00125 try:
00126 ret, num_handles = self._socket_action(fd, action)
00127 except pycurl.error, e:
00128 ret = e.args[0]
00129 if ret != pycurl.E_CALL_MULTI_PERFORM:
00130 break
00131 self._finish_pending_requests()
00132
00133 def _handle_timeout(self):
00134 """Called by IOLoop when the requested timeout has passed."""
00135 with stack_context.NullContext():
00136 self._timeout = None
00137 while True:
00138 try:
00139 ret, num_handles = self._socket_action(
00140 pycurl.SOCKET_TIMEOUT, 0)
00141 except pycurl.error, e:
00142 ret = e.args[0]
00143 if ret != pycurl.E_CALL_MULTI_PERFORM:
00144 break
00145 self._finish_pending_requests()
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160 new_timeout = self._multi.timeout()
00161 if new_timeout != -1:
00162 self._set_timeout(new_timeout)
00163
00164 def _handle_force_timeout(self):
00165 """Called by IOLoop periodically to ask libcurl to process any
00166 events it may have forgotten about.
00167 """
00168 with stack_context.NullContext():
00169 while True:
00170 try:
00171 ret, num_handles = self._multi.socket_all()
00172 except pycurl.error, e:
00173 ret = e.args[0]
00174 if ret != pycurl.E_CALL_MULTI_PERFORM:
00175 break
00176 self._finish_pending_requests()
00177
00178 def _finish_pending_requests(self):
00179 """Process any requests that were completed by the last
00180 call to multi.socket_action.
00181 """
00182 while True:
00183 num_q, ok_list, err_list = self._multi.info_read()
00184 for curl in ok_list:
00185 self._finish(curl)
00186 for curl, errnum, errmsg in err_list:
00187 self._finish(curl, errnum, errmsg)
00188 if num_q == 0:
00189 break
00190 self._process_queue()
00191
00192 def _process_queue(self):
00193 with stack_context.NullContext():
00194 while True:
00195 started = 0
00196 while self._free_list and self._requests:
00197 started += 1
00198 curl = self._free_list.pop()
00199 (request, callback) = self._requests.popleft()
00200 curl.info = {
00201 "headers": httputil.HTTPHeaders(),
00202 "buffer": cStringIO.StringIO(),
00203 "request": request,
00204 "callback": callback,
00205 "curl_start_time": time.time(),
00206 }
00207
00208
00209
00210 if pycurl.version_info()[2] <= 0x71500:
00211 curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
00212 _curl_setup_request(curl, request, curl.info["buffer"],
00213 curl.info["headers"])
00214 self._multi.add_handle(curl)
00215
00216 if not started:
00217 break
00218
00219 def _finish(self, curl, curl_error=None, curl_message=None):
00220 info = curl.info
00221 curl.info = None
00222 self._multi.remove_handle(curl)
00223 self._free_list.append(curl)
00224 buffer = info["buffer"]
00225 if curl_error:
00226 error = CurlError(curl_error, curl_message)
00227 code = error.code
00228 effective_url = None
00229 buffer.close()
00230 buffer = None
00231 else:
00232 error = None
00233 code = curl.getinfo(pycurl.HTTP_CODE)
00234 effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
00235 buffer.seek(0)
00236
00237
00238 time_info = dict(
00239 queue=info["curl_start_time"] - info["request"].start_time,
00240 namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME),
00241 connect=curl.getinfo(pycurl.CONNECT_TIME),
00242 pretransfer=curl.getinfo(pycurl.PRETRANSFER_TIME),
00243 starttransfer=curl.getinfo(pycurl.STARTTRANSFER_TIME),
00244 total=curl.getinfo(pycurl.TOTAL_TIME),
00245 redirect=curl.getinfo(pycurl.REDIRECT_TIME),
00246 )
00247 try:
00248 info["callback"](HTTPResponse(
00249 request=info["request"], code=code, headers=info["headers"],
00250 buffer=buffer, effective_url=effective_url, error=error,
00251 request_time=time.time() - info["curl_start_time"],
00252 time_info=time_info))
00253 except Exception:
00254 self.handle_callback_exception(info["callback"])
00255
00256 def handle_callback_exception(self, callback):
00257 self.io_loop.handle_callback_exception(callback)
00258
00259
00260 class CurlError(HTTPError):
00261 def __init__(self, errno, message):
00262 HTTPError.__init__(self, 599, message)
00263 self.errno = errno
00264
00265
00266 def _curl_create(max_simultaneous_connections=None):
00267 curl = pycurl.Curl()
00268 if logging.getLogger().isEnabledFor(logging.DEBUG):
00269 curl.setopt(pycurl.VERBOSE, 1)
00270 curl.setopt(pycurl.DEBUGFUNCTION, _curl_debug)
00271 curl.setopt(pycurl.MAXCONNECTS, max_simultaneous_connections or 5)
00272 return curl
00273
00274
00275 def _curl_setup_request(curl, request, buffer, headers):
00276 curl.setopt(pycurl.URL, utf8(request.url))
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286 if "Expect" not in request.headers:
00287 request.headers["Expect"] = ""
00288
00289
00290 if "Pragma" not in request.headers:
00291 request.headers["Pragma"] = ""
00292
00293
00294 if isinstance(request.headers, httputil.HTTPHeaders):
00295 curl.setopt(pycurl.HTTPHEADER,
00296 [utf8("%s: %s" % i) for i in request.headers.get_all()])
00297 else:
00298 curl.setopt(pycurl.HTTPHEADER,
00299 [utf8("%s: %s" % i) for i in request.headers.iteritems()])
00300
00301 if request.header_callback:
00302 curl.setopt(pycurl.HEADERFUNCTION, request.header_callback)
00303 else:
00304 curl.setopt(pycurl.HEADERFUNCTION,
00305 lambda line: _curl_header_callback(headers, line))
00306 if request.streaming_callback:
00307 curl.setopt(pycurl.WRITEFUNCTION, request.streaming_callback)
00308 else:
00309 curl.setopt(pycurl.WRITEFUNCTION, buffer.write)
00310 curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects)
00311 curl.setopt(pycurl.MAXREDIRS, request.max_redirects)
00312 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(1000 * request.connect_timeout))
00313 curl.setopt(pycurl.TIMEOUT_MS, int(1000 * request.request_timeout))
00314 if request.user_agent:
00315 curl.setopt(pycurl.USERAGENT, utf8(request.user_agent))
00316 else:
00317 curl.setopt(pycurl.USERAGENT, "Mozilla/5.0 (compatible; pycurl)")
00318 if request.network_interface:
00319 curl.setopt(pycurl.INTERFACE, request.network_interface)
00320 if request.use_gzip:
00321 curl.setopt(pycurl.ENCODING, "gzip,deflate")
00322 else:
00323 curl.setopt(pycurl.ENCODING, "none")
00324 if request.proxy_host and request.proxy_port:
00325 curl.setopt(pycurl.PROXY, request.proxy_host)
00326 curl.setopt(pycurl.PROXYPORT, request.proxy_port)
00327 if request.proxy_username:
00328 credentials = '%s:%s' % (request.proxy_username,
00329 request.proxy_password)
00330 curl.setopt(pycurl.PROXYUSERPWD, credentials)
00331 else:
00332 curl.setopt(pycurl.PROXY, '')
00333 if request.validate_cert:
00334 curl.setopt(pycurl.SSL_VERIFYPEER, 1)
00335 curl.setopt(pycurl.SSL_VERIFYHOST, 2)
00336 else:
00337 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
00338 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
00339 if request.ca_certs is not None:
00340 curl.setopt(pycurl.CAINFO, request.ca_certs)
00341 else:
00342
00343
00344
00345
00346
00347
00348 pass
00349
00350 if request.allow_ipv6 is False:
00351
00352
00353
00354 curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
00355
00356
00357
00358 curl_options = {
00359 "GET": pycurl.HTTPGET,
00360 "POST": pycurl.POST,
00361 "PUT": pycurl.UPLOAD,
00362 "HEAD": pycurl.NOBODY,
00363 }
00364 custom_methods = set(["DELETE"])
00365 for o in curl_options.values():
00366 curl.setopt(o, False)
00367 if request.method in curl_options:
00368 curl.unsetopt(pycurl.CUSTOMREQUEST)
00369 curl.setopt(curl_options[request.method], True)
00370 elif request.allow_nonstandard_methods or request.method in custom_methods:
00371 curl.setopt(pycurl.CUSTOMREQUEST, request.method)
00372 else:
00373 raise KeyError('unknown method ' + request.method)
00374
00375
00376 if request.method in ("POST", "PUT"):
00377 request_buffer = cStringIO.StringIO(utf8(request.body))
00378 curl.setopt(pycurl.READFUNCTION, request_buffer.read)
00379 if request.method == "POST":
00380 def ioctl(cmd):
00381 if cmd == curl.IOCMD_RESTARTREAD:
00382 request_buffer.seek(0)
00383 curl.setopt(pycurl.IOCTLFUNCTION, ioctl)
00384 curl.setopt(pycurl.POSTFIELDSIZE, len(request.body))
00385 else:
00386 curl.setopt(pycurl.INFILESIZE, len(request.body))
00387
00388 if request.auth_username is not None:
00389 userpwd = "%s:%s" % (request.auth_username, request.auth_password or '')
00390 curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
00391 curl.setopt(pycurl.USERPWD, utf8(userpwd))
00392 logging.debug("%s %s (username: %r)", request.method, request.url,
00393 request.auth_username)
00394 else:
00395 curl.unsetopt(pycurl.USERPWD)
00396 logging.debug("%s %s", request.method, request.url)
00397
00398 if request.client_cert is not None:
00399 curl.setopt(pycurl.SSLCERT, request.client_cert)
00400
00401 if request.client_key is not None:
00402 curl.setopt(pycurl.SSLKEY, request.client_key)
00403
00404 if threading.activeCount() > 1:
00405
00406
00407
00408
00409
00410
00411
00412
00413 curl.setopt(pycurl.NOSIGNAL, 1)
00414 if request.prepare_curl_callback is not None:
00415 request.prepare_curl_callback(curl)
00416
00417
00418 def _curl_header_callback(headers, header_line):
00419
00420 header_line = header_line.strip()
00421 if header_line.startswith("HTTP/"):
00422 headers.clear()
00423 return
00424 if not header_line:
00425 return
00426 headers.parse_line(header_line)
00427
00428
00429 def _curl_debug(debug_type, debug_msg):
00430 debug_types = ('I', '<', '>', '<', '>')
00431 if debug_type == 0:
00432 logging.debug('%s', debug_msg.strip())
00433 elif debug_type in (1, 2):
00434 for line in debug_msg.splitlines():
00435 logging.debug('%s %s', debug_types[debug_type], line)
00436 elif debug_type == 4:
00437 logging.debug('%s %r', debug_types[debug_type], debug_msg)
00438
00439 if __name__ == "__main__":
00440 AsyncHTTPClient.configure(CurlAsyncHTTPClient)
00441 main()