curl_httpclient.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2009 Facebook
00004 #
00005 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00006 # not use this file except in compliance with the License. You may obtain
00007 # a copy of the License at
00008 #
00009 #     http://www.apache.org/licenses/LICENSE-2.0
00010 #
00011 # Unless required by applicable law or agreed to in writing, software
00012 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00013 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00014 # License for the specific language governing permissions and limitations
00015 # under the License.
00016 
00017 """Blocking and non-blocking HTTP client implementations using pycurl."""
00018 
00019 from __future__ import absolute_import, division, with_statement
00020 
00021 import cStringIO
00022 import collections
00023 import logging
00024 import pycurl
00025 import threading
00026 import time
00027 
00028 from tornado import httputil
00029 from tornado import ioloop
00030 from tornado import stack_context
00031 
00032 from tornado.escape import utf8
00033 from tornado.httpclient import HTTPRequest, HTTPResponse, HTTPError, AsyncHTTPClient, main
00034 
00035 
00036 class CurlAsyncHTTPClient(AsyncHTTPClient):
00037     def initialize(self, io_loop=None, max_clients=10,
00038                    max_simultaneous_connections=None):
00039         self.io_loop = io_loop
00040         self._multi = pycurl.CurlMulti()
00041         self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
00042         self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
00043         self._curls = [_curl_create(max_simultaneous_connections)
00044                        for i in xrange(max_clients)]
00045         self._free_list = self._curls[:]
00046         self._requests = collections.deque()
00047         self._fds = {}
00048         self._timeout = None
00049 
00050         try:
00051             self._socket_action = self._multi.socket_action
00052         except AttributeError:
00053             # socket_action is found in pycurl since 7.18.2 (it's been
00054             # in libcurl longer than that but wasn't accessible to
00055             # python).
00056             logging.warning("socket_action method missing from pycurl; "
00057                             "falling back to socket_all. Upgrading "
00058                             "libcurl and pycurl will improve performance")
00059             self._socket_action = \
00060                 lambda fd, action: self._multi.socket_all()
00061 
00062         # libcurl has bugs that sometimes cause it to not report all
00063         # relevant file descriptors and timeouts to TIMERFUNCTION/
00064         # SOCKETFUNCTION.  Mitigate the effects of such bugs by
00065         # forcing a periodic scan of all active requests.
00066         self._force_timeout_callback = ioloop.PeriodicCallback(
00067             self._handle_force_timeout, 1000, io_loop=io_loop)
00068         self._force_timeout_callback.start()
00069 
00070     def close(self):
00071         self._force_timeout_callback.stop()
00072         for curl in self._curls:
00073             curl.close()
00074         self._multi.close()
00075         self._closed = True
00076         super(CurlAsyncHTTPClient, self).close()
00077 
00078     def fetch(self, request, callback, **kwargs):
00079         if not isinstance(request, HTTPRequest):
00080             request = HTTPRequest(url=request, **kwargs)
00081         self._requests.append((request, stack_context.wrap(callback)))
00082         self._process_queue()
00083         self._set_timeout(0)
00084 
00085     def _handle_socket(self, event, fd, multi, data):
00086         """Called by libcurl when it wants to change the file descriptors
00087         it cares about.
00088         """
00089         event_map = {
00090             pycurl.POLL_NONE: ioloop.IOLoop.NONE,
00091             pycurl.POLL_IN: ioloop.IOLoop.READ,
00092             pycurl.POLL_OUT: ioloop.IOLoop.WRITE,
00093             pycurl.POLL_INOUT: ioloop.IOLoop.READ | ioloop.IOLoop.WRITE
00094         }
00095         if event == pycurl.POLL_REMOVE:
00096             self.io_loop.remove_handler(fd)
00097             del self._fds[fd]
00098         else:
00099             ioloop_event = event_map[event]
00100             if fd not in self._fds:
00101                 self._fds[fd] = ioloop_event
00102                 self.io_loop.add_handler(fd, self._handle_events,
00103                                          ioloop_event)
00104             else:
00105                 self._fds[fd] = ioloop_event
00106                 self.io_loop.update_handler(fd, ioloop_event)
00107 
00108     def _set_timeout(self, msecs):
00109         """Called by libcurl to schedule a timeout."""
00110         if self._timeout is not None:
00111             self.io_loop.remove_timeout(self._timeout)
00112         self._timeout = self.io_loop.add_timeout(
00113             time.time() + msecs / 1000.0, self._handle_timeout)
00114 
00115     def _handle_events(self, fd, events):
00116         """Called by IOLoop when there is activity on one of our
00117         file descriptors.
00118         """
00119         action = 0
00120         if events & ioloop.IOLoop.READ:
00121             action |= pycurl.CSELECT_IN
00122         if events & ioloop.IOLoop.WRITE:
00123             action |= pycurl.CSELECT_OUT
00124         while True:
00125             try:
00126                 ret, num_handles = self._socket_action(fd, action)
00127             except pycurl.error, e:
00128                 ret = e.args[0]
00129             if ret != pycurl.E_CALL_MULTI_PERFORM:
00130                 break
00131         self._finish_pending_requests()
00132 
00133     def _handle_timeout(self):
00134         """Called by IOLoop when the requested timeout has passed."""
00135         with stack_context.NullContext():
00136             self._timeout = None
00137             while True:
00138                 try:
00139                     ret, num_handles = self._socket_action(
00140                         pycurl.SOCKET_TIMEOUT, 0)
00141                 except pycurl.error, e:
00142                     ret = e.args[0]
00143                 if ret != pycurl.E_CALL_MULTI_PERFORM:
00144                     break
00145             self._finish_pending_requests()
00146 
00147         # In theory, we shouldn't have to do this because curl will
00148         # call _set_timeout whenever the timeout changes.  However,
00149         # sometimes after _handle_timeout we will need to reschedule
00150         # immediately even though nothing has changed from curl's
00151         # perspective.  This is because when socket_action is
00152         # called with SOCKET_TIMEOUT, libcurl decides internally which
00153         # timeouts need to be processed by using a monotonic clock
00154         # (where available) while tornado uses python's time.time()
00155         # to decide when timeouts have occurred.  When those clocks
00156         # disagree on elapsed time (as they will whenever there is an
00157         # NTP adjustment), tornado might call _handle_timeout before
00158         # libcurl is ready.  After each timeout, resync the scheduled
00159         # timeout with libcurl's current state.
00160         new_timeout = self._multi.timeout()
00161         if new_timeout != -1:
00162             self._set_timeout(new_timeout)
00163 
00164     def _handle_force_timeout(self):
00165         """Called by IOLoop periodically to ask libcurl to process any
00166         events it may have forgotten about.
00167         """
00168         with stack_context.NullContext():
00169             while True:
00170                 try:
00171                     ret, num_handles = self._multi.socket_all()
00172                 except pycurl.error, e:
00173                     ret = e.args[0]
00174                 if ret != pycurl.E_CALL_MULTI_PERFORM:
00175                     break
00176             self._finish_pending_requests()
00177 
00178     def _finish_pending_requests(self):
00179         """Process any requests that were completed by the last
00180         call to multi.socket_action.
00181         """
00182         while True:
00183             num_q, ok_list, err_list = self._multi.info_read()
00184             for curl in ok_list:
00185                 self._finish(curl)
00186             for curl, errnum, errmsg in err_list:
00187                 self._finish(curl, errnum, errmsg)
00188             if num_q == 0:
00189                 break
00190         self._process_queue()
00191 
00192     def _process_queue(self):
00193         with stack_context.NullContext():
00194             while True:
00195                 started = 0
00196                 while self._free_list and self._requests:
00197                     started += 1
00198                     curl = self._free_list.pop()
00199                     (request, callback) = self._requests.popleft()
00200                     curl.info = {
00201                         "headers": httputil.HTTPHeaders(),
00202                         "buffer": cStringIO.StringIO(),
00203                         "request": request,
00204                         "callback": callback,
00205                         "curl_start_time": time.time(),
00206                     }
00207                     # Disable IPv6 to mitigate the effects of this bug
00208                     # on curl versions <= 7.21.0
00209                     # http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976
00210                     if pycurl.version_info()[2] <= 0x71500:  # 7.21.0
00211                         curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
00212                     _curl_setup_request(curl, request, curl.info["buffer"],
00213                                         curl.info["headers"])
00214                     self._multi.add_handle(curl)
00215 
00216                 if not started:
00217                     break
00218 
00219     def _finish(self, curl, curl_error=None, curl_message=None):
00220         info = curl.info
00221         curl.info = None
00222         self._multi.remove_handle(curl)
00223         self._free_list.append(curl)
00224         buffer = info["buffer"]
00225         if curl_error:
00226             error = CurlError(curl_error, curl_message)
00227             code = error.code
00228             effective_url = None
00229             buffer.close()
00230             buffer = None
00231         else:
00232             error = None
00233             code = curl.getinfo(pycurl.HTTP_CODE)
00234             effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
00235             buffer.seek(0)
00236         # the various curl timings are documented at
00237         # http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html
00238         time_info = dict(
00239             queue=info["curl_start_time"] - info["request"].start_time,
00240             namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME),
00241             connect=curl.getinfo(pycurl.CONNECT_TIME),
00242             pretransfer=curl.getinfo(pycurl.PRETRANSFER_TIME),
00243             starttransfer=curl.getinfo(pycurl.STARTTRANSFER_TIME),
00244             total=curl.getinfo(pycurl.TOTAL_TIME),
00245             redirect=curl.getinfo(pycurl.REDIRECT_TIME),
00246             )
00247         try:
00248             info["callback"](HTTPResponse(
00249                 request=info["request"], code=code, headers=info["headers"],
00250                 buffer=buffer, effective_url=effective_url, error=error,
00251                 request_time=time.time() - info["curl_start_time"],
00252                 time_info=time_info))
00253         except Exception:
00254             self.handle_callback_exception(info["callback"])
00255 
00256     def handle_callback_exception(self, callback):
00257         self.io_loop.handle_callback_exception(callback)
00258 
00259 
00260 class CurlError(HTTPError):
00261     def __init__(self, errno, message):
00262         HTTPError.__init__(self, 599, message)
00263         self.errno = errno
00264 
00265 
00266 def _curl_create(max_simultaneous_connections=None):
00267     curl = pycurl.Curl()
00268     if logging.getLogger().isEnabledFor(logging.DEBUG):
00269         curl.setopt(pycurl.VERBOSE, 1)
00270         curl.setopt(pycurl.DEBUGFUNCTION, _curl_debug)
00271     curl.setopt(pycurl.MAXCONNECTS, max_simultaneous_connections or 5)
00272     return curl
00273 
00274 
00275 def _curl_setup_request(curl, request, buffer, headers):
00276     curl.setopt(pycurl.URL, utf8(request.url))
00277 
00278     # libcurl's magic "Expect: 100-continue" behavior causes delays
00279     # with servers that don't support it (which include, among others,
00280     # Google's OpenID endpoint).  Additionally, this behavior has
00281     # a bug in conjunction with the curl_multi_socket_action API
00282     # (https://sourceforge.net/tracker/?func=detail&atid=100976&aid=3039744&group_id=976),
00283     # which increases the delays.  It's more trouble than it's worth,
00284     # so just turn off the feature (yes, setting Expect: to an empty
00285     # value is the official way to disable this)
00286     if "Expect" not in request.headers:
00287         request.headers["Expect"] = ""
00288 
00289     # libcurl adds Pragma: no-cache by default; disable that too
00290     if "Pragma" not in request.headers:
00291         request.headers["Pragma"] = ""
00292 
00293     # Request headers may be either a regular dict or HTTPHeaders object
00294     if isinstance(request.headers, httputil.HTTPHeaders):
00295         curl.setopt(pycurl.HTTPHEADER,
00296                     [utf8("%s: %s" % i) for i in request.headers.get_all()])
00297     else:
00298         curl.setopt(pycurl.HTTPHEADER,
00299                     [utf8("%s: %s" % i) for i in request.headers.iteritems()])
00300 
00301     if request.header_callback:
00302         curl.setopt(pycurl.HEADERFUNCTION, request.header_callback)
00303     else:
00304         curl.setopt(pycurl.HEADERFUNCTION,
00305                     lambda line: _curl_header_callback(headers, line))
00306     if request.streaming_callback:
00307         curl.setopt(pycurl.WRITEFUNCTION, request.streaming_callback)
00308     else:
00309         curl.setopt(pycurl.WRITEFUNCTION, buffer.write)
00310     curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects)
00311     curl.setopt(pycurl.MAXREDIRS, request.max_redirects)
00312     curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(1000 * request.connect_timeout))
00313     curl.setopt(pycurl.TIMEOUT_MS, int(1000 * request.request_timeout))
00314     if request.user_agent:
00315         curl.setopt(pycurl.USERAGENT, utf8(request.user_agent))
00316     else:
00317         curl.setopt(pycurl.USERAGENT, "Mozilla/5.0 (compatible; pycurl)")
00318     if request.network_interface:
00319         curl.setopt(pycurl.INTERFACE, request.network_interface)
00320     if request.use_gzip:
00321         curl.setopt(pycurl.ENCODING, "gzip,deflate")
00322     else:
00323         curl.setopt(pycurl.ENCODING, "none")
00324     if request.proxy_host and request.proxy_port:
00325         curl.setopt(pycurl.PROXY, request.proxy_host)
00326         curl.setopt(pycurl.PROXYPORT, request.proxy_port)
00327         if request.proxy_username:
00328             credentials = '%s:%s' % (request.proxy_username,
00329                     request.proxy_password)
00330             curl.setopt(pycurl.PROXYUSERPWD, credentials)
00331     else:
00332         curl.setopt(pycurl.PROXY, '')
00333     if request.validate_cert:
00334         curl.setopt(pycurl.SSL_VERIFYPEER, 1)
00335         curl.setopt(pycurl.SSL_VERIFYHOST, 2)
00336     else:
00337         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
00338         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
00339     if request.ca_certs is not None:
00340         curl.setopt(pycurl.CAINFO, request.ca_certs)
00341     else:
00342         # There is no way to restore pycurl.CAINFO to its default value
00343         # (Using unsetopt makes it reject all certificates).
00344         # I don't see any way to read the default value from python so it
00345         # can be restored later.  We'll have to just leave CAINFO untouched
00346         # if no ca_certs file was specified, and require that if any
00347         # request uses a custom ca_certs file, they all must.
00348         pass
00349 
00350     if request.allow_ipv6 is False:
00351         # Curl behaves reasonably when DNS resolution gives an ipv6 address
00352         # that we can't reach, so allow ipv6 unless the user asks to disable.
00353         # (but see version check in _process_queue above)
00354         curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
00355 
00356     # Set the request method through curl's irritating interface which makes
00357     # up names for almost every single method
00358     curl_options = {
00359         "GET": pycurl.HTTPGET,
00360         "POST": pycurl.POST,
00361         "PUT": pycurl.UPLOAD,
00362         "HEAD": pycurl.NOBODY,
00363     }
00364     custom_methods = set(["DELETE"])
00365     for o in curl_options.values():
00366         curl.setopt(o, False)
00367     if request.method in curl_options:
00368         curl.unsetopt(pycurl.CUSTOMREQUEST)
00369         curl.setopt(curl_options[request.method], True)
00370     elif request.allow_nonstandard_methods or request.method in custom_methods:
00371         curl.setopt(pycurl.CUSTOMREQUEST, request.method)
00372     else:
00373         raise KeyError('unknown method ' + request.method)
00374 
00375     # Handle curl's cryptic options for every individual HTTP method
00376     if request.method in ("POST", "PUT"):
00377         request_buffer = cStringIO.StringIO(utf8(request.body))
00378         curl.setopt(pycurl.READFUNCTION, request_buffer.read)
00379         if request.method == "POST":
00380             def ioctl(cmd):
00381                 if cmd == curl.IOCMD_RESTARTREAD:
00382                     request_buffer.seek(0)
00383             curl.setopt(pycurl.IOCTLFUNCTION, ioctl)
00384             curl.setopt(pycurl.POSTFIELDSIZE, len(request.body))
00385         else:
00386             curl.setopt(pycurl.INFILESIZE, len(request.body))
00387 
00388     if request.auth_username is not None:
00389         userpwd = "%s:%s" % (request.auth_username, request.auth_password or '')
00390         curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
00391         curl.setopt(pycurl.USERPWD, utf8(userpwd))
00392         logging.debug("%s %s (username: %r)", request.method, request.url,
00393                       request.auth_username)
00394     else:
00395         curl.unsetopt(pycurl.USERPWD)
00396         logging.debug("%s %s", request.method, request.url)
00397 
00398     if request.client_cert is not None:
00399         curl.setopt(pycurl.SSLCERT, request.client_cert)
00400 
00401     if request.client_key is not None:
00402         curl.setopt(pycurl.SSLKEY, request.client_key)
00403 
00404     if threading.activeCount() > 1:
00405         # libcurl/pycurl is not thread-safe by default.  When multiple threads
00406         # are used, signals should be disabled.  This has the side effect
00407         # of disabling DNS timeouts in some environments (when libcurl is
00408         # not linked against ares), so we don't do it when there is only one
00409         # thread.  Applications that use many short-lived threads may need
00410         # to set NOSIGNAL manually in a prepare_curl_callback since
00411         # there may not be any other threads running at the time we call
00412         # threading.activeCount.
00413         curl.setopt(pycurl.NOSIGNAL, 1)
00414     if request.prepare_curl_callback is not None:
00415         request.prepare_curl_callback(curl)
00416 
00417 
00418 def _curl_header_callback(headers, header_line):
00419     # header_line as returned by curl includes the end-of-line characters.
00420     header_line = header_line.strip()
00421     if header_line.startswith("HTTP/"):
00422         headers.clear()
00423         return
00424     if not header_line:
00425         return
00426     headers.parse_line(header_line)
00427 
00428 
00429 def _curl_debug(debug_type, debug_msg):
00430     debug_types = ('I', '<', '>', '<', '>')
00431     if debug_type == 0:
00432         logging.debug('%s', debug_msg.strip())
00433     elif debug_type in (1, 2):
00434         for line in debug_msg.splitlines():
00435             logging.debug('%s %s', debug_types[debug_type], line)
00436     elif debug_type == 4:
00437         logging.debug('%s %r', debug_types[debug_type], debug_msg)
00438 
00439 if __name__ == "__main__":
00440     AsyncHTTPClient.configure(CurlAsyncHTTPClient)
00441     main()


roswww
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:53:30