iostream.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2009 Facebook
00004 #
00005 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00006 # not use this file except in compliance with the License. You may obtain
00007 # a copy of the License at
00008 #
00009 #     http://www.apache.org/licenses/LICENSE-2.0
00010 #
00011 # Unless required by applicable law or agreed to in writing, software
00012 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00013 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00014 # License for the specific language governing permissions and limitations
00015 # under the License.
00016 
00017 """A utility class to write to and read from a non-blocking socket."""
00018 
00019 from __future__ import absolute_import, division, with_statement
00020 
00021 import collections
00022 import errno
00023 import logging
00024 import os
00025 import socket
00026 import sys
00027 import re
00028 
00029 from tornado import ioloop
00030 from tornado import stack_context
00031 from tornado.util import b, bytes_type
00032 
00033 try:
00034     import ssl  # Python 2.6+
00035 except ImportError:
00036     ssl = None
00037 
00038 
00039 class IOStream(object):
00040     r"""A utility class to write to and read from a non-blocking socket.
00041 
00042     We support a non-blocking ``write()`` and a family of ``read_*()`` methods.
00043     All of the methods take callbacks (since writing and reading are
00044     non-blocking and asynchronous).
00045 
00046     The socket parameter may either be connected or unconnected.  For
00047     server operations the socket is the result of calling socket.accept().
00048     For client operations the socket is created with socket.socket(),
00049     and may either be connected before passing it to the IOStream or
00050     connected with IOStream.connect.
00051 
00052     When a stream is closed due to an error, the IOStream's `error`
00053     attribute contains the exception object.
00054 
00055     A very simple (and broken) HTTP client using this class::
00056 
00057         from tornado import ioloop
00058         from tornado import iostream
00059         import socket
00060 
00061         def send_request():
00062             stream.write("GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
00063             stream.read_until("\r\n\r\n", on_headers)
00064 
00065         def on_headers(data):
00066             headers = {}
00067             for line in data.split("\r\n"):
00068                parts = line.split(":")
00069                if len(parts) == 2:
00070                    headers[parts[0].strip()] = parts[1].strip()
00071             stream.read_bytes(int(headers["Content-Length"]), on_body)
00072 
00073         def on_body(data):
00074             print data
00075             stream.close()
00076             ioloop.IOLoop.instance().stop()
00077 
00078         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00079         stream = iostream.IOStream(s)
00080         stream.connect(("friendfeed.com", 80), send_request)
00081         ioloop.IOLoop.instance().start()
00082 
00083     """
00084     def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
00085                  read_chunk_size=4096):
00086         self.socket = socket
00087         self.socket.setblocking(False)
00088         self.io_loop = io_loop or ioloop.IOLoop.instance()
00089         self.max_buffer_size = max_buffer_size
00090         self.read_chunk_size = read_chunk_size
00091         self.error = None
00092         self._read_buffer = collections.deque()
00093         self._write_buffer = collections.deque()
00094         self._read_buffer_size = 0
00095         self._write_buffer_frozen = False
00096         self._read_delimiter = None
00097         self._read_regex = None
00098         self._read_bytes = None
00099         self._read_until_close = False
00100         self._read_callback = None
00101         self._streaming_callback = None
00102         self._write_callback = None
00103         self._close_callback = None
00104         self._connect_callback = None
00105         self._connecting = False
00106         self._state = None
00107         self._pending_callbacks = 0
00108 
00109     def connect(self, address, callback=None):
00110         """Connects the socket to a remote address without blocking.
00111 
00112         May only be called if the socket passed to the constructor was
00113         not previously connected.  The address parameter is in the
00114         same format as for socket.connect, i.e. a (host, port) tuple.
00115         If callback is specified, it will be called when the
00116         connection is completed.
00117 
00118         Note that it is safe to call IOStream.write while the
00119         connection is pending, in which case the data will be written
00120         as soon as the connection is ready.  Calling IOStream read
00121         methods before the socket is connected works on some platforms
00122         but is non-portable.
00123         """
00124         self._connecting = True
00125         try:
00126             self.socket.connect(address)
00127         except socket.error, e:
00128             # In non-blocking mode we expect connect() to raise an
00129             # exception with EINPROGRESS or EWOULDBLOCK.
00130             #
00131             # On freebsd, other errors such as ECONNREFUSED may be
00132             # returned immediately when attempting to connect to
00133             # localhost, so handle them the same way as an error
00134             # reported later in _handle_connect.
00135             if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK):
00136                 logging.warning("Connect error on fd %d: %s",
00137                                 self.socket.fileno(), e)
00138                 self.close()
00139                 return
00140         self._connect_callback = stack_context.wrap(callback)
00141         self._add_io_state(self.io_loop.WRITE)
00142 
00143     def read_until_regex(self, regex, callback):
00144         """Call callback when we read the given regex pattern."""
00145         self._set_read_callback(callback)
00146         self._read_regex = re.compile(regex)
00147         self._try_inline_read()
00148 
00149     def read_until(self, delimiter, callback):
00150         """Call callback when we read the given delimiter."""
00151         self._set_read_callback(callback)
00152         self._read_delimiter = delimiter
00153         self._try_inline_read()
00154 
00155     def read_bytes(self, num_bytes, callback, streaming_callback=None):
00156         """Call callback when we read the given number of bytes.
00157 
00158         If a ``streaming_callback`` is given, it will be called with chunks
00159         of data as they become available, and the argument to the final
00160         ``callback`` will be empty.
00161         """
00162         self._set_read_callback(callback)
00163         assert isinstance(num_bytes, (int, long))
00164         self._read_bytes = num_bytes
00165         self._streaming_callback = stack_context.wrap(streaming_callback)
00166         self._try_inline_read()
00167 
00168     def read_until_close(self, callback, streaming_callback=None):
00169         """Reads all data from the socket until it is closed.
00170 
00171         If a ``streaming_callback`` is given, it will be called with chunks
00172         of data as they become available, and the argument to the final
00173         ``callback`` will be empty.
00174 
00175         Subject to ``max_buffer_size`` limit from `IOStream` constructor if
00176         a ``streaming_callback`` is not used.
00177         """
00178         self._set_read_callback(callback)
00179         if self.closed():
00180             self._run_callback(callback, self._consume(self._read_buffer_size))
00181             self._read_callback = None
00182             return
00183         self._read_until_close = True
00184         self._streaming_callback = stack_context.wrap(streaming_callback)
00185         self._add_io_state(self.io_loop.READ)
00186 
00187     def write(self, data, callback=None):
00188         """Write the given data to this stream.
00189 
00190         If callback is given, we call it when all of the buffered write
00191         data has been successfully written to the stream. If there was
00192         previously buffered write data and an old write callback, that
00193         callback is simply overwritten with this new callback.
00194         """
00195         assert isinstance(data, bytes_type)
00196         self._check_closed()
00197         # We use bool(_write_buffer) as a proxy for write_buffer_size>0,
00198         # so never put empty strings in the buffer.
00199         if data:
00200             # Break up large contiguous strings before inserting them in the
00201             # write buffer, so we don't have to recopy the entire thing
00202             # as we slice off pieces to send to the socket.
00203             WRITE_BUFFER_CHUNK_SIZE = 128 * 1024
00204             if len(data) > WRITE_BUFFER_CHUNK_SIZE:
00205                 for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE):
00206                     self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE])
00207             else:
00208                 self._write_buffer.append(data)
00209         self._write_callback = stack_context.wrap(callback)
00210         self._handle_write()
00211         if self._write_buffer:
00212             self._add_io_state(self.io_loop.WRITE)
00213         self._maybe_add_error_listener()
00214 
00215     def set_close_callback(self, callback):
00216         """Call the given callback when the stream is closed."""
00217         self._close_callback = stack_context.wrap(callback)
00218 
00219     def close(self):
00220         """Close this stream."""
00221         if self.socket is not None:
00222             if any(sys.exc_info()):
00223                 self.error = sys.exc_info()[1]
00224             if self._read_until_close:
00225                 callback = self._read_callback
00226                 self._read_callback = None
00227                 self._read_until_close = False
00228                 self._run_callback(callback,
00229                                    self._consume(self._read_buffer_size))
00230             if self._state is not None:
00231                 self.io_loop.remove_handler(self.socket.fileno())
00232                 self._state = None
00233             self.socket.close()
00234             self.socket = None
00235         self._maybe_run_close_callback()
00236 
00237     def _maybe_run_close_callback(self):
00238         if (self.socket is None and self._close_callback and
00239             self._pending_callbacks == 0):
00240             # if there are pending callbacks, don't run the close callback
00241             # until they're done (see _maybe_add_error_handler)
00242             cb = self._close_callback
00243             self._close_callback = None
00244             self._run_callback(cb)
00245 
00246     def reading(self):
00247         """Returns true if we are currently reading from the stream."""
00248         return self._read_callback is not None
00249 
00250     def writing(self):
00251         """Returns true if we are currently writing to the stream."""
00252         return bool(self._write_buffer)
00253 
00254     def closed(self):
00255         """Returns true if the stream has been closed."""
00256         return self.socket is None
00257 
00258     def _handle_events(self, fd, events):
00259         if not self.socket:
00260             logging.warning("Got events for closed stream %d", fd)
00261             return
00262         try:
00263             if events & self.io_loop.READ:
00264                 self._handle_read()
00265             if not self.socket:
00266                 return
00267             if events & self.io_loop.WRITE:
00268                 if self._connecting:
00269                     self._handle_connect()
00270                 self._handle_write()
00271             if not self.socket:
00272                 return
00273             if events & self.io_loop.ERROR:
00274                 errno = self.socket.getsockopt(socket.SOL_SOCKET,
00275                                                socket.SO_ERROR)
00276                 self.error = socket.error(errno, os.strerror(errno))
00277                 # We may have queued up a user callback in _handle_read or
00278                 # _handle_write, so don't close the IOStream until those
00279                 # callbacks have had a chance to run.
00280                 self.io_loop.add_callback(self.close)
00281                 return
00282             state = self.io_loop.ERROR
00283             if self.reading():
00284                 state |= self.io_loop.READ
00285             if self.writing():
00286                 state |= self.io_loop.WRITE
00287             if state == self.io_loop.ERROR:
00288                 state |= self.io_loop.READ
00289             if state != self._state:
00290                 assert self._state is not None, \
00291                     "shouldn't happen: _handle_events without self._state"
00292                 self._state = state
00293                 self.io_loop.update_handler(self.socket.fileno(), self._state)
00294         except Exception:
00295             logging.error("Uncaught exception, closing connection.",
00296                           exc_info=True)
00297             self.close()
00298             raise
00299 
00300     def _run_callback(self, callback, *args):
00301         def wrapper():
00302             self._pending_callbacks -= 1
00303             try:
00304                 callback(*args)
00305             except Exception:
00306                 logging.error("Uncaught exception, closing connection.",
00307                               exc_info=True)
00308                 # Close the socket on an uncaught exception from a user callback
00309                 # (It would eventually get closed when the socket object is
00310                 # gc'd, but we don't want to rely on gc happening before we
00311                 # run out of file descriptors)
00312                 self.close()
00313                 # Re-raise the exception so that IOLoop.handle_callback_exception
00314                 # can see it and log the error
00315                 raise
00316             self._maybe_add_error_listener()
00317         # We schedule callbacks to be run on the next IOLoop iteration
00318         # rather than running them directly for several reasons:
00319         # * Prevents unbounded stack growth when a callback calls an
00320         #   IOLoop operation that immediately runs another callback
00321         # * Provides a predictable execution context for e.g.
00322         #   non-reentrant mutexes
00323         # * Ensures that the try/except in wrapper() is run outside
00324         #   of the application's StackContexts
00325         with stack_context.NullContext():
00326             # stack_context was already captured in callback, we don't need to
00327             # capture it again for IOStream's wrapper.  This is especially
00328             # important if the callback was pre-wrapped before entry to
00329             # IOStream (as in HTTPConnection._header_callback), as we could
00330             # capture and leak the wrong context here.
00331             self._pending_callbacks += 1
00332             self.io_loop.add_callback(wrapper)
00333 
00334     def _handle_read(self):
00335         try:
00336             try:
00337                 # Pretend to have a pending callback so that an EOF in
00338                 # _read_to_buffer doesn't trigger an immediate close
00339                 # callback.  At the end of this method we'll either
00340                 # estabilsh a real pending callback via
00341                 # _read_from_buffer or run the close callback.
00342                 #
00343                 # We need two try statements here so that
00344                 # pending_callbacks is decremented before the `except`
00345                 # clause below (which calls `close` and does need to
00346                 # trigger the callback)
00347                 self._pending_callbacks += 1
00348                 while True:
00349                     # Read from the socket until we get EWOULDBLOCK or equivalent.
00350                     # SSL sockets do some internal buffering, and if the data is
00351                     # sitting in the SSL object's buffer select() and friends
00352                     # can't see it; the only way to find out if it's there is to
00353                     # try to read it.
00354                     if self._read_to_buffer() == 0:
00355                         break
00356             finally:
00357                 self._pending_callbacks -= 1
00358         except Exception:
00359             logging.warning("error on read", exc_info=True)
00360             self.close()
00361             return
00362         if self._read_from_buffer():
00363             return
00364         else:
00365             self._maybe_run_close_callback()
00366 
00367     def _set_read_callback(self, callback):
00368         assert not self._read_callback, "Already reading"
00369         self._read_callback = stack_context.wrap(callback)
00370 
00371     def _try_inline_read(self):
00372         """Attempt to complete the current read operation from buffered data.
00373 
00374         If the read can be completed without blocking, schedules the
00375         read callback on the next IOLoop iteration; otherwise starts
00376         listening for reads on the socket.
00377         """
00378         # See if we've already got the data from a previous read
00379         if self._read_from_buffer():
00380             return
00381         self._check_closed()
00382         try:
00383             # See comments in _handle_read about incrementing _pending_callbacks
00384             self._pending_callbacks += 1
00385             while True:
00386                 if self._read_to_buffer() == 0:
00387                     break
00388                 self._check_closed()
00389         finally:
00390             self._pending_callbacks -= 1
00391         if self._read_from_buffer():
00392             return
00393         self._add_io_state(self.io_loop.READ)
00394 
00395     def _read_from_socket(self):
00396         """Attempts to read from the socket.
00397 
00398         Returns the data read or None if there is nothing to read.
00399         May be overridden in subclasses.
00400         """
00401         try:
00402             chunk = self.socket.recv(self.read_chunk_size)
00403         except socket.error, e:
00404             if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
00405                 return None
00406             else:
00407                 raise
00408         if not chunk:
00409             self.close()
00410             return None
00411         return chunk
00412 
00413     def _read_to_buffer(self):
00414         """Reads from the socket and appends the result to the read buffer.
00415 
00416         Returns the number of bytes read.  Returns 0 if there is nothing
00417         to read (i.e. the read returns EWOULDBLOCK or equivalent).  On
00418         error closes the socket and raises an exception.
00419         """
00420         try:
00421             chunk = self._read_from_socket()
00422         except socket.error, e:
00423             # ssl.SSLError is a subclass of socket.error
00424             logging.warning("Read error on %d: %s",
00425                             self.socket.fileno(), e)
00426             self.close()
00427             raise
00428         if chunk is None:
00429             return 0
00430         self._read_buffer.append(chunk)
00431         self._read_buffer_size += len(chunk)
00432         if self._read_buffer_size >= self.max_buffer_size:
00433             logging.error("Reached maximum read buffer size")
00434             self.close()
00435             raise IOError("Reached maximum read buffer size")
00436         return len(chunk)
00437 
00438     def _read_from_buffer(self):
00439         """Attempts to complete the currently-pending read from the buffer.
00440 
00441         Returns True if the read was completed.
00442         """
00443         if self._streaming_callback is not None and self._read_buffer_size:
00444             bytes_to_consume = self._read_buffer_size
00445             if self._read_bytes is not None:
00446                 bytes_to_consume = min(self._read_bytes, bytes_to_consume)
00447                 self._read_bytes -= bytes_to_consume
00448             self._run_callback(self._streaming_callback,
00449                                self._consume(bytes_to_consume))
00450         if self._read_bytes is not None and self._read_buffer_size >= self._read_bytes:
00451             num_bytes = self._read_bytes
00452             callback = self._read_callback
00453             self._read_callback = None
00454             self._streaming_callback = None
00455             self._read_bytes = None
00456             self._run_callback(callback, self._consume(num_bytes))
00457             return True
00458         elif self._read_delimiter is not None:
00459             # Multi-byte delimiters (e.g. '\r\n') may straddle two
00460             # chunks in the read buffer, so we can't easily find them
00461             # without collapsing the buffer.  However, since protocols
00462             # using delimited reads (as opposed to reads of a known
00463             # length) tend to be "line" oriented, the delimiter is likely
00464             # to be in the first few chunks.  Merge the buffer gradually
00465             # since large merges are relatively expensive and get undone in
00466             # consume().
00467             if self._read_buffer:
00468                 while True:
00469                     loc = self._read_buffer[0].find(self._read_delimiter)
00470                     if loc != -1:
00471                         callback = self._read_callback
00472                         delimiter_len = len(self._read_delimiter)
00473                         self._read_callback = None
00474                         self._streaming_callback = None
00475                         self._read_delimiter = None
00476                         self._run_callback(callback,
00477                                            self._consume(loc + delimiter_len))
00478                         return True
00479                     if len(self._read_buffer) == 1:
00480                         break
00481                     _double_prefix(self._read_buffer)
00482         elif self._read_regex is not None:
00483             if self._read_buffer:
00484                 while True:
00485                     m = self._read_regex.search(self._read_buffer[0])
00486                     if m is not None:
00487                         callback = self._read_callback
00488                         self._read_callback = None
00489                         self._streaming_callback = None
00490                         self._read_regex = None
00491                         self._run_callback(callback, self._consume(m.end()))
00492                         return True
00493                     if len(self._read_buffer) == 1:
00494                         break
00495                     _double_prefix(self._read_buffer)
00496         return False
00497 
00498     def _handle_connect(self):
00499         err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
00500         if err != 0:
00501             self.error = socket.error(err, os.strerror(err))
00502             # IOLoop implementations may vary: some of them return
00503             # an error state before the socket becomes writable, so
00504             # in that case a connection failure would be handled by the
00505             # error path in _handle_events instead of here.
00506             logging.warning("Connect error on fd %d: %s",
00507                             self.socket.fileno(), errno.errorcode[err])
00508             self.close()
00509             return
00510         if self._connect_callback is not None:
00511             callback = self._connect_callback
00512             self._connect_callback = None
00513             self._run_callback(callback)
00514         self._connecting = False
00515 
00516     def _handle_write(self):
00517         while self._write_buffer:
00518             try:
00519                 if not self._write_buffer_frozen:
00520                     # On windows, socket.send blows up if given a
00521                     # write buffer that's too large, instead of just
00522                     # returning the number of bytes it was able to
00523                     # process.  Therefore we must not call socket.send
00524                     # with more than 128KB at a time.
00525                     _merge_prefix(self._write_buffer, 128 * 1024)
00526                 num_bytes = self.socket.send(self._write_buffer[0])
00527                 if num_bytes == 0:
00528                     # With OpenSSL, if we couldn't write the entire buffer,
00529                     # the very same string object must be used on the
00530                     # next call to send.  Therefore we suppress
00531                     # merging the write buffer after an incomplete send.
00532                     # A cleaner solution would be to set
00533                     # SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER, but this is
00534                     # not yet accessible from python
00535                     # (http://bugs.python.org/issue8240)
00536                     self._write_buffer_frozen = True
00537                     break
00538                 self._write_buffer_frozen = False
00539                 _merge_prefix(self._write_buffer, num_bytes)
00540                 self._write_buffer.popleft()
00541             except socket.error, e:
00542                 if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
00543                     self._write_buffer_frozen = True
00544                     break
00545                 else:
00546                     logging.warning("Write error on %d: %s",
00547                                     self.socket.fileno(), e)
00548                     self.close()
00549                     return
00550         if not self._write_buffer and self._write_callback:
00551             callback = self._write_callback
00552             self._write_callback = None
00553             self._run_callback(callback)
00554 
00555     def _consume(self, loc):
00556         if loc == 0:
00557             return b("")
00558         _merge_prefix(self._read_buffer, loc)
00559         self._read_buffer_size -= loc
00560         return self._read_buffer.popleft()
00561 
00562     def _check_closed(self):
00563         if not self.socket:
00564             raise IOError("Stream is closed")
00565 
00566     def _maybe_add_error_listener(self):
00567         if self._state is None and self._pending_callbacks == 0:
00568             if self.socket is None:
00569                 self._maybe_run_close_callback()
00570             else:
00571                 self._add_io_state(ioloop.IOLoop.READ)
00572 
00573     def _add_io_state(self, state):
00574         """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
00575 
00576         Implementation notes: Reads and writes have a fast path and a
00577         slow path.  The fast path reads synchronously from socket
00578         buffers, while the slow path uses `_add_io_state` to schedule
00579         an IOLoop callback.  Note that in both cases, the callback is
00580         run asynchronously with `_run_callback`.
00581 
00582         To detect closed connections, we must have called
00583         `_add_io_state` at some point, but we want to delay this as
00584         much as possible so we don't have to set an `IOLoop.ERROR`
00585         listener that will be overwritten by the next slow-path
00586         operation.  As long as there are callbacks scheduled for
00587         fast-path ops, those callbacks may do more reads.
00588         If a sequence of fast-path ops do not end in a slow-path op,
00589         (e.g. for an @asynchronous long-poll request), we must add
00590         the error handler.  This is done in `_run_callback` and `write`
00591         (since the write callback is optional so we can have a
00592         fast-path write with no `_run_callback`)
00593         """
00594         if self.socket is None:
00595             # connection has been closed, so there can be no future events
00596             return
00597         if self._state is None:
00598             self._state = ioloop.IOLoop.ERROR | state
00599             with stack_context.NullContext():
00600                 self.io_loop.add_handler(
00601                     self.socket.fileno(), self._handle_events, self._state)
00602         elif not self._state & state:
00603             self._state = self._state | state
00604             self.io_loop.update_handler(self.socket.fileno(), self._state)
00605 
00606 
00607 class SSLIOStream(IOStream):
00608     """A utility class to write to and read from a non-blocking SSL socket.
00609 
00610     If the socket passed to the constructor is already connected,
00611     it should be wrapped with::
00612 
00613         ssl.wrap_socket(sock, do_handshake_on_connect=False, **kwargs)
00614 
00615     before constructing the SSLIOStream.  Unconnected sockets will be
00616     wrapped when IOStream.connect is finished.
00617     """
00618     def __init__(self, *args, **kwargs):
00619         """Creates an SSLIOStream.
00620 
00621         If a dictionary is provided as keyword argument ssl_options,
00622         it will be used as additional keyword arguments to ssl.wrap_socket.
00623         """
00624         self._ssl_options = kwargs.pop('ssl_options', {})
00625         super(SSLIOStream, self).__init__(*args, **kwargs)
00626         self._ssl_accepting = True
00627         self._handshake_reading = False
00628         self._handshake_writing = False
00629 
00630     def reading(self):
00631         return self._handshake_reading or super(SSLIOStream, self).reading()
00632 
00633     def writing(self):
00634         return self._handshake_writing or super(SSLIOStream, self).writing()
00635 
00636     def _do_ssl_handshake(self):
00637         # Based on code from test_ssl.py in the python stdlib
00638         try:
00639             self._handshake_reading = False
00640             self._handshake_writing = False
00641             self.socket.do_handshake()
00642         except ssl.SSLError, err:
00643             if err.args[0] == ssl.SSL_ERROR_WANT_READ:
00644                 self._handshake_reading = True
00645                 return
00646             elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
00647                 self._handshake_writing = True
00648                 return
00649             elif err.args[0] in (ssl.SSL_ERROR_EOF,
00650                                  ssl.SSL_ERROR_ZERO_RETURN):
00651                 return self.close()
00652             elif err.args[0] == ssl.SSL_ERROR_SSL:
00653                 logging.warning("SSL Error on %d: %s", self.socket.fileno(), err)
00654                 return self.close()
00655             raise
00656         except socket.error, err:
00657             if err.args[0] == errno.ECONNABORTED:
00658                 return self.close()
00659         else:
00660             self._ssl_accepting = False
00661             super(SSLIOStream, self)._handle_connect()
00662 
00663     def _handle_read(self):
00664         if self._ssl_accepting:
00665             self._do_ssl_handshake()
00666             return
00667         super(SSLIOStream, self)._handle_read()
00668 
00669     def _handle_write(self):
00670         if self._ssl_accepting:
00671             self._do_ssl_handshake()
00672             return
00673         super(SSLIOStream, self)._handle_write()
00674 
00675     def _handle_connect(self):
00676         self.socket = ssl.wrap_socket(self.socket,
00677                                       do_handshake_on_connect=False,
00678                                       **self._ssl_options)
00679         # Don't call the superclass's _handle_connect (which is responsible
00680         # for telling the application that the connection is complete)
00681         # until we've completed the SSL handshake (so certificates are
00682         # available, etc).
00683 
00684     def _read_from_socket(self):
00685         if self._ssl_accepting:
00686             # If the handshake hasn't finished yet, there can't be anything
00687             # to read (attempting to read may or may not raise an exception
00688             # depending on the SSL version)
00689             return None
00690         try:
00691             # SSLSocket objects have both a read() and recv() method,
00692             # while regular sockets only have recv().
00693             # The recv() method blocks (at least in python 2.6) if it is
00694             # called when there is nothing to read, so we have to use
00695             # read() instead.
00696             chunk = self.socket.read(self.read_chunk_size)
00697         except ssl.SSLError, e:
00698             # SSLError is a subclass of socket.error, so this except
00699             # block must come first.
00700             if e.args[0] == ssl.SSL_ERROR_WANT_READ:
00701                 return None
00702             else:
00703                 raise
00704         except socket.error, e:
00705             if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
00706                 return None
00707             else:
00708                 raise
00709         if not chunk:
00710             self.close()
00711             return None
00712         return chunk
00713 
00714 
00715 def _double_prefix(deque):
00716     """Grow by doubling, but don't split the second chunk just because the
00717     first one is small.
00718     """
00719     new_len = max(len(deque[0]) * 2,
00720                   (len(deque[0]) + len(deque[1])))
00721     _merge_prefix(deque, new_len)
00722 
00723 
00724 def _merge_prefix(deque, size):
00725     """Replace the first entries in a deque of strings with a single
00726     string of up to size bytes.
00727 
00728     >>> d = collections.deque(['abc', 'de', 'fghi', 'j'])
00729     >>> _merge_prefix(d, 5); print d
00730     deque(['abcde', 'fghi', 'j'])
00731 
00732     Strings will be split as necessary to reach the desired size.
00733     >>> _merge_prefix(d, 7); print d
00734     deque(['abcdefg', 'hi', 'j'])
00735 
00736     >>> _merge_prefix(d, 3); print d
00737     deque(['abc', 'defg', 'hi', 'j'])
00738 
00739     >>> _merge_prefix(d, 100); print d
00740     deque(['abcdefghij'])
00741     """
00742     if len(deque) == 1 and len(deque[0]) <= size:
00743         return
00744     prefix = []
00745     remaining = size
00746     while deque and remaining > 0:
00747         chunk = deque.popleft()
00748         if len(chunk) > remaining:
00749             deque.appendleft(chunk[remaining:])
00750             chunk = chunk[:remaining]
00751         prefix.append(chunk)
00752         remaining -= len(chunk)
00753     # This data structure normally just contains byte strings, but
00754     # the unittest gets messy if it doesn't use the default str() type,
00755     # so do the merge based on the type of data that's actually present.
00756     if prefix:
00757         deque.appendleft(type(prefix[0])().join(prefix))
00758     if not deque:
00759         deque.appendleft(b(""))
00760 
00761 
00762 def doctests():
00763     import doctest
00764     return doctest.DocTestSuite()


roswww
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:53:30