curl_httpclient.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2009 Facebook
00004 #
00005 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00006 # not use this file except in compliance with the License. You may obtain
00007 # a copy of the License at
00008 #
00009 #     http://www.apache.org/licenses/LICENSE-2.0
00010 #
00011 # Unless required by applicable law or agreed to in writing, software
00012 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00013 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00014 # License for the specific language governing permissions and limitations
00015 # under the License.
00016 
00017 """Non-blocking HTTP client implementation using pycurl."""
00018 
00019 from __future__ import absolute_import, division, print_function, with_statement
00020 
00021 import collections
00022 import logging
00023 import pycurl
00024 import threading
00025 import time
00026 
00027 from tornado import httputil
00028 from tornado import ioloop
00029 from tornado.log import gen_log
00030 from tornado import stack_context
00031 
00032 from tornado.escape import utf8, native_str
00033 from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main
00034 from tornado.util import bytes_type
00035 
00036 try:
00037     from io import BytesIO  # py3
00038 except ImportError:
00039     from cStringIO import StringIO as BytesIO  # py2
00040 
00041 
00042 class CurlAsyncHTTPClient(AsyncHTTPClient):
00043     def initialize(self, io_loop, max_clients=10, defaults=None):
00044         super(CurlAsyncHTTPClient, self).initialize(io_loop, defaults=defaults)
00045         self._multi = pycurl.CurlMulti()
00046         self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
00047         self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
00048         self._curls = [_curl_create() for i in range(max_clients)]
00049         self._free_list = self._curls[:]
00050         self._requests = collections.deque()
00051         self._fds = {}
00052         self._timeout = None
00053 
00054         # libcurl has bugs that sometimes cause it to not report all
00055         # relevant file descriptors and timeouts to TIMERFUNCTION/
00056         # SOCKETFUNCTION.  Mitigate the effects of such bugs by
00057         # forcing a periodic scan of all active requests.
00058         self._force_timeout_callback = ioloop.PeriodicCallback(
00059             self._handle_force_timeout, 1000, io_loop=io_loop)
00060         self._force_timeout_callback.start()
00061 
00062         # Work around a bug in libcurl 7.29.0: Some fields in the curl
00063         # multi object are initialized lazily, and its destructor will
00064         # segfault if it is destroyed without having been used.  Add
00065         # and remove a dummy handle to make sure everything is
00066         # initialized.
00067         dummy_curl_handle = pycurl.Curl()
00068         self._multi.add_handle(dummy_curl_handle)
00069         self._multi.remove_handle(dummy_curl_handle)
00070 
00071     def close(self):
00072         self._force_timeout_callback.stop()
00073         if self._timeout is not None:
00074             self.io_loop.remove_timeout(self._timeout)
00075         for curl in self._curls:
00076             curl.close()
00077         self._multi.close()
00078         super(CurlAsyncHTTPClient, self).close()
00079 
00080     def fetch_impl(self, request, callback):
00081         self._requests.append((request, callback))
00082         self._process_queue()
00083         self._set_timeout(0)
00084 
00085     def _handle_socket(self, event, fd, multi, data):
00086         """Called by libcurl when it wants to change the file descriptors
00087         it cares about.
00088         """
00089         event_map = {
00090             pycurl.POLL_NONE: ioloop.IOLoop.NONE,
00091             pycurl.POLL_IN: ioloop.IOLoop.READ,
00092             pycurl.POLL_OUT: ioloop.IOLoop.WRITE,
00093             pycurl.POLL_INOUT: ioloop.IOLoop.READ | ioloop.IOLoop.WRITE
00094         }
00095         if event == pycurl.POLL_REMOVE:
00096             if fd in self._fds:
00097                 self.io_loop.remove_handler(fd)
00098                 del self._fds[fd]
00099         else:
00100             ioloop_event = event_map[event]
00101             # libcurl sometimes closes a socket and then opens a new
00102             # one using the same FD without giving us a POLL_NONE in
00103             # between.  This is a problem with the epoll IOLoop,
00104             # because the kernel can tell when a socket is closed and
00105             # removes it from the epoll automatically, causing future
00106             # update_handler calls to fail.  Since we can't tell when
00107             # this has happened, always use remove and re-add
00108             # instead of update.
00109             if fd in self._fds:
00110                 self.io_loop.remove_handler(fd)
00111             self.io_loop.add_handler(fd, self._handle_events,
00112                                      ioloop_event)
00113             self._fds[fd] = ioloop_event
00114 
00115     def _set_timeout(self, msecs):
00116         """Called by libcurl to schedule a timeout."""
00117         if self._timeout is not None:
00118             self.io_loop.remove_timeout(self._timeout)
00119         self._timeout = self.io_loop.add_timeout(
00120             self.io_loop.time() + msecs / 1000.0, self._handle_timeout)
00121 
00122     def _handle_events(self, fd, events):
00123         """Called by IOLoop when there is activity on one of our
00124         file descriptors.
00125         """
00126         action = 0
00127         if events & ioloop.IOLoop.READ:
00128             action |= pycurl.CSELECT_IN
00129         if events & ioloop.IOLoop.WRITE:
00130             action |= pycurl.CSELECT_OUT
00131         while True:
00132             try:
00133                 ret, num_handles = self._multi.socket_action(fd, action)
00134             except pycurl.error as e:
00135                 ret = e.args[0]
00136             if ret != pycurl.E_CALL_MULTI_PERFORM:
00137                 break
00138         self._finish_pending_requests()
00139 
00140     def _handle_timeout(self):
00141         """Called by IOLoop when the requested timeout has passed."""
00142         with stack_context.NullContext():
00143             self._timeout = None
00144             while True:
00145                 try:
00146                     ret, num_handles = self._multi.socket_action(
00147                         pycurl.SOCKET_TIMEOUT, 0)
00148                 except pycurl.error as e:
00149                     ret = e.args[0]
00150                 if ret != pycurl.E_CALL_MULTI_PERFORM:
00151                     break
00152             self._finish_pending_requests()
00153 
00154         # In theory, we shouldn't have to do this because curl will
00155         # call _set_timeout whenever the timeout changes.  However,
00156         # sometimes after _handle_timeout we will need to reschedule
00157         # immediately even though nothing has changed from curl's
00158         # perspective.  This is because when socket_action is
00159         # called with SOCKET_TIMEOUT, libcurl decides internally which
00160         # timeouts need to be processed by using a monotonic clock
00161         # (where available) while tornado uses python's time.time()
00162         # to decide when timeouts have occurred.  When those clocks
00163         # disagree on elapsed time (as they will whenever there is an
00164         # NTP adjustment), tornado might call _handle_timeout before
00165         # libcurl is ready.  After each timeout, resync the scheduled
00166         # timeout with libcurl's current state.
00167         new_timeout = self._multi.timeout()
00168         if new_timeout >= 0:
00169             self._set_timeout(new_timeout)
00170 
00171     def _handle_force_timeout(self):
00172         """Called by IOLoop periodically to ask libcurl to process any
00173         events it may have forgotten about.
00174         """
00175         with stack_context.NullContext():
00176             while True:
00177                 try:
00178                     ret, num_handles = self._multi.socket_all()
00179                 except pycurl.error as e:
00180                     ret = e.args[0]
00181                 if ret != pycurl.E_CALL_MULTI_PERFORM:
00182                     break
00183             self._finish_pending_requests()
00184 
00185     def _finish_pending_requests(self):
00186         """Process any requests that were completed by the last
00187         call to multi.socket_action.
00188         """
00189         while True:
00190             num_q, ok_list, err_list = self._multi.info_read()
00191             for curl in ok_list:
00192                 self._finish(curl)
00193             for curl, errnum, errmsg in err_list:
00194                 self._finish(curl, errnum, errmsg)
00195             if num_q == 0:
00196                 break
00197         self._process_queue()
00198 
00199     def _process_queue(self):
00200         with stack_context.NullContext():
00201             while True:
00202                 started = 0
00203                 while self._free_list and self._requests:
00204                     started += 1
00205                     curl = self._free_list.pop()
00206                     (request, callback) = self._requests.popleft()
00207                     curl.info = {
00208                         "headers": httputil.HTTPHeaders(),
00209                         "buffer": BytesIO(),
00210                         "request": request,
00211                         "callback": callback,
00212                         "curl_start_time": time.time(),
00213                     }
00214                     _curl_setup_request(curl, request, curl.info["buffer"],
00215                                         curl.info["headers"])
00216                     self._multi.add_handle(curl)
00217 
00218                 if not started:
00219                     break
00220 
00221     def _finish(self, curl, curl_error=None, curl_message=None):
00222         info = curl.info
00223         curl.info = None
00224         self._multi.remove_handle(curl)
00225         self._free_list.append(curl)
00226         buffer = info["buffer"]
00227         if curl_error:
00228             error = CurlError(curl_error, curl_message)
00229             code = error.code
00230             effective_url = None
00231             buffer.close()
00232             buffer = None
00233         else:
00234             error = None
00235             code = curl.getinfo(pycurl.HTTP_CODE)
00236             effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
00237             buffer.seek(0)
00238         # the various curl timings are documented at
00239         # http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html
00240         time_info = dict(
00241             queue=info["curl_start_time"] - info["request"].start_time,
00242             namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME),
00243             connect=curl.getinfo(pycurl.CONNECT_TIME),
00244             pretransfer=curl.getinfo(pycurl.PRETRANSFER_TIME),
00245             starttransfer=curl.getinfo(pycurl.STARTTRANSFER_TIME),
00246             total=curl.getinfo(pycurl.TOTAL_TIME),
00247             redirect=curl.getinfo(pycurl.REDIRECT_TIME),
00248         )
00249         try:
00250             info["callback"](HTTPResponse(
00251                 request=info["request"], code=code, headers=info["headers"],
00252                 buffer=buffer, effective_url=effective_url, error=error,
00253                 reason=info['headers'].get("X-Http-Reason", None),
00254                 request_time=time.time() - info["curl_start_time"],
00255                 time_info=time_info))
00256         except Exception:
00257             self.handle_callback_exception(info["callback"])
00258 
00259     def handle_callback_exception(self, callback):
00260         self.io_loop.handle_callback_exception(callback)
00261 
00262 
00263 class CurlError(HTTPError):
00264     def __init__(self, errno, message):
00265         HTTPError.__init__(self, 599, message)
00266         self.errno = errno
00267 
00268 
00269 def _curl_create():
00270     curl = pycurl.Curl()
00271     if gen_log.isEnabledFor(logging.DEBUG):
00272         curl.setopt(pycurl.VERBOSE, 1)
00273         curl.setopt(pycurl.DEBUGFUNCTION, _curl_debug)
00274     return curl
00275 
00276 
00277 def _curl_setup_request(curl, request, buffer, headers):
00278     curl.setopt(pycurl.URL, native_str(request.url))
00279 
00280     # libcurl's magic "Expect: 100-continue" behavior causes delays
00281     # with servers that don't support it (which include, among others,
00282     # Google's OpenID endpoint).  Additionally, this behavior has
00283     # a bug in conjunction with the curl_multi_socket_action API
00284     # (https://sourceforge.net/tracker/?func=detail&atid=100976&aid=3039744&group_id=976),
00285     # which increases the delays.  It's more trouble than it's worth,
00286     # so just turn off the feature (yes, setting Expect: to an empty
00287     # value is the official way to disable this)
00288     if "Expect" not in request.headers:
00289         request.headers["Expect"] = ""
00290 
00291     # libcurl adds Pragma: no-cache by default; disable that too
00292     if "Pragma" not in request.headers:
00293         request.headers["Pragma"] = ""
00294 
00295     # Request headers may be either a regular dict or HTTPHeaders object
00296     if isinstance(request.headers, httputil.HTTPHeaders):
00297         curl.setopt(pycurl.HTTPHEADER,
00298                     [native_str("%s: %s" % i) for i in request.headers.get_all()])
00299     else:
00300         curl.setopt(pycurl.HTTPHEADER,
00301                     [native_str("%s: %s" % i) for i in request.headers.items()])
00302 
00303     if request.header_callback:
00304         curl.setopt(pycurl.HEADERFUNCTION,
00305                     lambda line: request.header_callback(native_str(line)))
00306     else:
00307         curl.setopt(pycurl.HEADERFUNCTION,
00308                     lambda line: _curl_header_callback(headers,
00309                                                        native_str(line)))
00310     if request.streaming_callback:
00311         write_function = request.streaming_callback
00312     else:
00313         write_function = buffer.write
00314     if bytes_type is str:  # py2
00315         curl.setopt(pycurl.WRITEFUNCTION, write_function)
00316     else:  # py3
00317         # Upstream pycurl doesn't support py3, but ubuntu 12.10 includes
00318         # a fork/port.  That version has a bug in which it passes unicode
00319         # strings instead of bytes to the WRITEFUNCTION.  This means that
00320         # if you use a WRITEFUNCTION (which tornado always does), you cannot
00321         # download arbitrary binary data.  This needs to be fixed in the
00322         # ported pycurl package, but in the meantime this lambda will
00323         # make it work for downloading (utf8) text.
00324         curl.setopt(pycurl.WRITEFUNCTION, lambda s: write_function(utf8(s)))
00325     curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects)
00326     curl.setopt(pycurl.MAXREDIRS, request.max_redirects)
00327     curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(1000 * request.connect_timeout))
00328     curl.setopt(pycurl.TIMEOUT_MS, int(1000 * request.request_timeout))
00329     if request.user_agent:
00330         curl.setopt(pycurl.USERAGENT, native_str(request.user_agent))
00331     else:
00332         curl.setopt(pycurl.USERAGENT, "Mozilla/5.0 (compatible; pycurl)")
00333     if request.network_interface:
00334         curl.setopt(pycurl.INTERFACE, request.network_interface)
00335     if request.decompress_response:
00336         curl.setopt(pycurl.ENCODING, "gzip,deflate")
00337     else:
00338         curl.setopt(pycurl.ENCODING, "none")
00339     if request.proxy_host and request.proxy_port:
00340         curl.setopt(pycurl.PROXY, request.proxy_host)
00341         curl.setopt(pycurl.PROXYPORT, request.proxy_port)
00342         if request.proxy_username:
00343             credentials = '%s:%s' % (request.proxy_username,
00344                                      request.proxy_password)
00345             curl.setopt(pycurl.PROXYUSERPWD, credentials)
00346     else:
00347         curl.setopt(pycurl.PROXY, '')
00348         curl.unsetopt(pycurl.PROXYUSERPWD)
00349     if request.validate_cert:
00350         curl.setopt(pycurl.SSL_VERIFYPEER, 1)
00351         curl.setopt(pycurl.SSL_VERIFYHOST, 2)
00352     else:
00353         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
00354         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
00355     if request.ca_certs is not None:
00356         curl.setopt(pycurl.CAINFO, request.ca_certs)
00357     else:
00358         # There is no way to restore pycurl.CAINFO to its default value
00359         # (Using unsetopt makes it reject all certificates).
00360         # I don't see any way to read the default value from python so it
00361         # can be restored later.  We'll have to just leave CAINFO untouched
00362         # if no ca_certs file was specified, and require that if any
00363         # request uses a custom ca_certs file, they all must.
00364         pass
00365 
00366     if request.allow_ipv6 is False:
00367         # Curl behaves reasonably when DNS resolution gives an ipv6 address
00368         # that we can't reach, so allow ipv6 unless the user asks to disable.
00369         curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
00370     else:
00371         curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER)
00372 
00373     # Set the request method through curl's irritating interface which makes
00374     # up names for almost every single method
00375     curl_options = {
00376         "GET": pycurl.HTTPGET,
00377         "POST": pycurl.POST,
00378         "PUT": pycurl.UPLOAD,
00379         "HEAD": pycurl.NOBODY,
00380     }
00381     custom_methods = set(["DELETE", "OPTIONS", "PATCH"])
00382     for o in curl_options.values():
00383         curl.setopt(o, False)
00384     if request.method in curl_options:
00385         curl.unsetopt(pycurl.CUSTOMREQUEST)
00386         curl.setopt(curl_options[request.method], True)
00387     elif request.allow_nonstandard_methods or request.method in custom_methods:
00388         curl.setopt(pycurl.CUSTOMREQUEST, request.method)
00389     else:
00390         raise KeyError('unknown method ' + request.method)
00391 
00392     # Handle curl's cryptic options for every individual HTTP method
00393     if request.method in ("POST", "PUT"):
00394         if request.body is None:
00395             raise AssertionError(
00396                 'Body must not be empty for "%s" request'
00397                 % request.method)
00398 
00399         request_buffer = BytesIO(utf8(request.body))
00400         curl.setopt(pycurl.READFUNCTION, request_buffer.read)
00401         if request.method == "POST":
00402             def ioctl(cmd):
00403                 if cmd == curl.IOCMD_RESTARTREAD:
00404                     request_buffer.seek(0)
00405             curl.setopt(pycurl.IOCTLFUNCTION, ioctl)
00406             curl.setopt(pycurl.POSTFIELDSIZE, len(request.body))
00407         else:
00408             curl.setopt(pycurl.INFILESIZE, len(request.body))
00409     elif request.method == "GET":
00410         if request.body is not None:
00411             raise AssertionError('Body must be empty for GET request')
00412 
00413     if request.auth_username is not None:
00414         userpwd = "%s:%s" % (request.auth_username, request.auth_password or '')
00415 
00416         if request.auth_mode is None or request.auth_mode == "basic":
00417             curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
00418         elif request.auth_mode == "digest":
00419             curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_DIGEST)
00420         else:
00421             raise ValueError("Unsupported auth_mode %s" % request.auth_mode)
00422 
00423         curl.setopt(pycurl.USERPWD, native_str(userpwd))
00424         gen_log.debug("%s %s (username: %r)", request.method, request.url,
00425                       request.auth_username)
00426     else:
00427         curl.unsetopt(pycurl.USERPWD)
00428         gen_log.debug("%s %s", request.method, request.url)
00429 
00430     if request.client_cert is not None:
00431         curl.setopt(pycurl.SSLCERT, request.client_cert)
00432 
00433     if request.client_key is not None:
00434         curl.setopt(pycurl.SSLKEY, request.client_key)
00435 
00436     if threading.activeCount() > 1:
00437         # libcurl/pycurl is not thread-safe by default.  When multiple threads
00438         # are used, signals should be disabled.  This has the side effect
00439         # of disabling DNS timeouts in some environments (when libcurl is
00440         # not linked against ares), so we don't do it when there is only one
00441         # thread.  Applications that use many short-lived threads may need
00442         # to set NOSIGNAL manually in a prepare_curl_callback since
00443         # there may not be any other threads running at the time we call
00444         # threading.activeCount.
00445         curl.setopt(pycurl.NOSIGNAL, 1)
00446     if request.prepare_curl_callback is not None:
00447         request.prepare_curl_callback(curl)
00448 
00449 
00450 def _curl_header_callback(headers, header_line):
00451     # header_line as returned by curl includes the end-of-line characters.
00452     header_line = header_line.strip()
00453     if header_line.startswith("HTTP/"):
00454         headers.clear()
00455         try:
00456             (__, __, reason) = httputil.parse_response_start_line(header_line)
00457             header_line = "X-Http-Reason: %s" % reason
00458         except httputil.HTTPInputError:
00459             return
00460     if not header_line:
00461         return
00462     headers.parse_line(header_line)
00463 
00464 
00465 def _curl_debug(debug_type, debug_msg):
00466     debug_types = ('I', '<', '>', '<', '>')
00467     if debug_type == 0:
00468         gen_log.debug('%s', debug_msg.strip())
00469     elif debug_type in (1, 2):
00470         for line in debug_msg.splitlines():
00471             gen_log.debug('%s %s', debug_types[debug_type], line)
00472     elif debug_type == 4:
00473         gen_log.debug('%s %r', debug_types[debug_type], debug_msg)
00474 
00475 if __name__ == "__main__":
00476     AsyncHTTPClient.configure(CurlAsyncHTTPClient)
00477     main()


rosbridge_tools
Author(s): Jonathan Mace
autogenerated on Sat Dec 27 2014 11:25:59