iostream.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2009 Facebook
00004 #
00005 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00006 # not use this file except in compliance with the License. You may obtain
00007 # a copy of the License at
00008 #
00009 #     http://www.apache.org/licenses/LICENSE-2.0
00010 #
00011 # Unless required by applicable law or agreed to in writing, software
00012 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00013 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00014 # License for the specific language governing permissions and limitations
00015 # under the License.
00016 
00017 """Utility classes to write to and read from non-blocking files and sockets.
00018 
00019 Contents:
00020 
00021 * `BaseIOStream`: Generic interface for reading and writing.
00022 * `IOStream`: Implementation of BaseIOStream using non-blocking sockets.
00023 * `SSLIOStream`: SSL-aware version of IOStream.
00024 * `PipeIOStream`: Pipe-based IOStream implementation.
00025 """
00026 
00027 from __future__ import absolute_import, division, print_function, with_statement
00028 
00029 import collections
00030 import errno
00031 import numbers
00032 import os
00033 import socket
00034 import sys
00035 import re
00036 
00037 from tornado.concurrent import TracebackFuture
00038 from tornado import ioloop
00039 from tornado.log import gen_log, app_log
00040 from tornado.netutil import ssl_wrap_socket, ssl_match_hostname, SSLCertificateError
00041 from tornado import stack_context
00042 from tornado.util import bytes_type, errno_from_exception
00043 
00044 try:
00045     from tornado.platform.posix import _set_nonblocking
00046 except ImportError:
00047     _set_nonblocking = None
00048 
00049 try:
00050     import ssl
00051 except ImportError:
00052     # ssl is not available on Google App Engine
00053     ssl = None
00054 
00055 # These errnos indicate that a non-blocking operation must be retried
00056 # at a later time.  On most platforms they're the same value, but on
00057 # some they differ.
00058 _ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)
00059 
00060 if hasattr(errno, "WSAEWOULDBLOCK"):
00061     _ERRNO_WOULDBLOCK += (errno.WSAEWOULDBLOCK,)
00062 
00063 # These errnos indicate that a connection has been abruptly terminated.
00064 # They should be caught and handled less noisily than other errors.
00065 _ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE,
00066                     errno.ETIMEDOUT)
00067 
00068 if hasattr(errno, "WSAECONNRESET"):
00069     _ERRNO_CONNRESET += (errno.WSAECONNRESET, errno.WSAECONNABORTED, errno.WSAETIMEDOUT)
00070 
00071 # More non-portable errnos:
00072 _ERRNO_INPROGRESS = (errno.EINPROGRESS,)
00073 
00074 if hasattr(errno, "WSAEINPROGRESS"):
00075     _ERRNO_INPROGRESS += (errno.WSAEINPROGRESS,)
00076 
00077 #######################################################
00078 class StreamClosedError(IOError):
00079     """Exception raised by `IOStream` methods when the stream is closed.
00080 
00081     Note that the close callback is scheduled to run *after* other
00082     callbacks on the stream (to allow for buffered data to be processed),
00083     so you may see this error before you see the close callback.
00084     """
00085     pass
00086 
00087 
00088 class UnsatisfiableReadError(Exception):
00089     """Exception raised when a read cannot be satisfied.
00090 
00091     Raised by ``read_until`` and ``read_until_regex`` with a ``max_bytes``
00092     argument.
00093     """
00094     pass
00095 
00096 
00097 class StreamBufferFullError(Exception):
00098     """Exception raised by `IOStream` methods when the buffer is full.
00099     """
00100 
00101 
00102 class BaseIOStream(object):
00103     """A utility class to write to and read from a non-blocking file or socket.
00104 
00105     We support a non-blocking ``write()`` and a family of ``read_*()`` methods.
00106     All of the methods take an optional ``callback`` argument and return a
00107     `.Future` only if no callback is given.  When the operation completes,
00108     the callback will be run or the `.Future` will resolve with the data
00109     read (or ``None`` for ``write()``).  All outstanding ``Futures`` will
00110     resolve with a `StreamClosedError` when the stream is closed; users
00111     of the callback interface will be notified via
00112     `.BaseIOStream.set_close_callback` instead.
00113 
00114     When a stream is closed due to an error, the IOStream's ``error``
00115     attribute contains the exception object.
00116 
00117     Subclasses must implement `fileno`, `close_fd`, `write_to_fd`,
00118     `read_from_fd`, and optionally `get_fd_error`.
00119     """
00120     def __init__(self, io_loop=None, max_buffer_size=None,
00121                  read_chunk_size=None, max_write_buffer_size=None):
00122         """`BaseIOStream` constructor.
00123 
00124         :arg io_loop: The `.IOLoop` to use; defaults to `.IOLoop.current`.
00125         :arg max_buffer_size: Maximum amount of incoming data to buffer;
00126             defaults to 100MB.
00127         :arg read_chunk_size: Amount of data to read at one time from the
00128             underlying transport; defaults to 64KB.
00129         :arg max_write_buffer_size: Amount of outgoing data to buffer;
00130             defaults to unlimited.
00131 
00132         .. versionchanged:: 4.0
00133            Add the ``max_write_buffer_size`` parameter.  Changed default
00134            ``read_chunk_size`` to 64KB.
00135         """
00136         self.io_loop = io_loop or ioloop.IOLoop.current()
00137         self.max_buffer_size = max_buffer_size or 104857600
00138         # A chunk size that is too close to max_buffer_size can cause
00139         # spurious failures.
00140         self.read_chunk_size = min(read_chunk_size or 65536,
00141                                    self.max_buffer_size // 2)
00142         self.max_write_buffer_size = max_write_buffer_size
00143         self.error = None
00144         self._read_buffer = collections.deque()
00145         self._write_buffer = collections.deque()
00146         self._read_buffer_size = 0
00147         self._write_buffer_size = 0
00148         self._write_buffer_frozen = False
00149         self._read_delimiter = None
00150         self._read_regex = None
00151         self._read_max_bytes = None
00152         self._read_bytes = None
00153         self._read_partial = False
00154         self._read_until_close = False
00155         self._read_callback = None
00156         self._read_future = None
00157         self._streaming_callback = None
00158         self._write_callback = None
00159         self._write_future = None
00160         self._close_callback = None
00161         self._connect_callback = None
00162         self._connect_future = None
00163         self._connecting = False
00164         self._state = None
00165         self._pending_callbacks = 0
00166         self._closed = False
00167 
00168     def fileno(self):
00169         """Returns the file descriptor for this stream."""
00170         raise NotImplementedError()
00171 
00172     def close_fd(self):
00173         """Closes the file underlying this stream.
00174 
00175         ``close_fd`` is called by `BaseIOStream` and should not be called
00176         elsewhere; other users should call `close` instead.
00177         """
00178         raise NotImplementedError()
00179 
00180     def write_to_fd(self, data):
00181         """Attempts to write ``data`` to the underlying file.
00182 
00183         Returns the number of bytes written.
00184         """
00185         raise NotImplementedError()
00186 
00187     def read_from_fd(self):
00188         """Attempts to read from the underlying file.
00189 
00190         Returns ``None`` if there was nothing to read (the socket
00191         returned `~errno.EWOULDBLOCK` or equivalent), otherwise
00192         returns the data.  When possible, should return no more than
00193         ``self.read_chunk_size`` bytes at a time.
00194         """
00195         raise NotImplementedError()
00196 
00197     def get_fd_error(self):
00198         """Returns information about any error on the underlying file.
00199 
00200         This method is called after the `.IOLoop` has signaled an error on the
00201         file descriptor, and should return an Exception (such as `socket.error`
00202         with additional information, or None if no such information is
00203         available.
00204         """
00205         return None
00206 
00207     def read_until_regex(self, regex, callback=None, max_bytes=None):
00208         """Asynchronously read until we have matched the given regex.
00209 
00210         The result includes the data that matches the regex and anything
00211         that came before it.  If a callback is given, it will be run
00212         with the data as an argument; if not, this method returns a
00213         `.Future`.
00214 
00215         If ``max_bytes`` is not None, the connection will be closed
00216         if more than ``max_bytes`` bytes have been read and the regex is
00217         not satisfied.
00218 
00219         .. versionchanged:: 4.0
00220             Added the ``max_bytes`` argument.  The ``callback`` argument is
00221             now optional and a `.Future` will be returned if it is omitted.
00222         """
00223         future = self._set_read_callback(callback)
00224         self._read_regex = re.compile(regex)
00225         self._read_max_bytes = max_bytes
00226         try:
00227             self._try_inline_read()
00228         except UnsatisfiableReadError as e:
00229             # Handle this the same way as in _handle_events.
00230             gen_log.info("Unsatisfiable read, closing connection: %s" % e)
00231             self.close(exc_info=True)
00232             return future
00233         return future
00234 
00235     def read_until(self, delimiter, callback=None, max_bytes=None):
00236         """Asynchronously read until we have found the given delimiter.
00237 
00238         The result includes all the data read including the delimiter.
00239         If a callback is given, it will be run with the data as an argument;
00240         if not, this method returns a `.Future`.
00241 
00242         If ``max_bytes`` is not None, the connection will be closed
00243         if more than ``max_bytes`` bytes have been read and the delimiter
00244         is not found.
00245 
00246         .. versionchanged:: 4.0
00247             Added the ``max_bytes`` argument.  The ``callback`` argument is
00248             now optional and a `.Future` will be returned if it is omitted.
00249         """
00250         future = self._set_read_callback(callback)
00251         self._read_delimiter = delimiter
00252         self._read_max_bytes = max_bytes
00253         try:
00254             self._try_inline_read()
00255         except UnsatisfiableReadError as e:
00256             # Handle this the same way as in _handle_events.
00257             gen_log.info("Unsatisfiable read, closing connection: %s" % e)
00258             self.close(exc_info=True)
00259             return future
00260         return future
00261 
00262     def read_bytes(self, num_bytes, callback=None, streaming_callback=None,
00263                    partial=False):
00264         """Asynchronously read a number of bytes.
00265 
00266         If a ``streaming_callback`` is given, it will be called with chunks
00267         of data as they become available, and the final result will be empty.
00268         Otherwise, the result is all the data that was read.
00269         If a callback is given, it will be run with the data as an argument;
00270         if not, this method returns a `.Future`.
00271 
00272         If ``partial`` is true, the callback is run as soon as we have
00273         any bytes to return (but never more than ``num_bytes``)
00274 
00275         .. versionchanged:: 4.0
00276             Added the ``partial`` argument.  The callback argument is now
00277             optional and a `.Future` will be returned if it is omitted.
00278         """
00279         future = self._set_read_callback(callback)
00280         assert isinstance(num_bytes, numbers.Integral)
00281         self._read_bytes = num_bytes
00282         self._read_partial = partial
00283         self._streaming_callback = stack_context.wrap(streaming_callback)
00284         self._try_inline_read()
00285         return future
00286 
00287     def read_until_close(self, callback=None, streaming_callback=None):
00288         """Asynchronously reads all data from the socket until it is closed.
00289 
00290         If a ``streaming_callback`` is given, it will be called with chunks
00291         of data as they become available, and the final result will be empty.
00292         Otherwise, the result is all the data that was read.
00293         If a callback is given, it will be run with the data as an argument;
00294         if not, this method returns a `.Future`.
00295 
00296         .. versionchanged:: 4.0
00297             The callback argument is now optional and a `.Future` will
00298             be returned if it is omitted.
00299         """
00300         future = self._set_read_callback(callback)
00301         self._streaming_callback = stack_context.wrap(streaming_callback)
00302         if self.closed():
00303             if self._streaming_callback is not None:
00304                 self._run_read_callback(self._read_buffer_size, True)
00305             self._run_read_callback(self._read_buffer_size, False)
00306             return future
00307         self._read_until_close = True
00308         self._try_inline_read()
00309         return future
00310 
00311     def write(self, data, callback=None):
00312         """Asynchronously write the given data to this stream.
00313 
00314         If ``callback`` is given, we call it when all of the buffered write
00315         data has been successfully written to the stream. If there was
00316         previously buffered write data and an old write callback, that
00317         callback is simply overwritten with this new callback.
00318 
00319         If no ``callback`` is given, this method returns a `.Future` that
00320         resolves (with a result of ``None``) when the write has been
00321         completed.  If `write` is called again before that `.Future` has
00322         resolved, the previous future will be orphaned and will never resolve.
00323 
00324         .. versionchanged:: 4.0
00325             Now returns a `.Future` if no callback is given.
00326         """
00327         assert isinstance(data, bytes_type)
00328         self._check_closed()
00329         # We use bool(_write_buffer) as a proxy for write_buffer_size>0,
00330         # so never put empty strings in the buffer.
00331         if data:
00332             if (self.max_write_buffer_size is not None and
00333                     self._write_buffer_size + len(data) > self.max_write_buffer_size):
00334                 raise StreamBufferFullError("Reached maximum read buffer size")
00335             # Break up large contiguous strings before inserting them in the
00336             # write buffer, so we don't have to recopy the entire thing
00337             # as we slice off pieces to send to the socket.
00338             WRITE_BUFFER_CHUNK_SIZE = 128 * 1024
00339             for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE):
00340                 self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE])
00341             self._write_buffer_size += len(data)
00342         if callback is not None:
00343             self._write_callback = stack_context.wrap(callback)
00344             future = None
00345         else:
00346             future = self._write_future = TracebackFuture()
00347         if not self._connecting:
00348             self._handle_write()
00349             if self._write_buffer:
00350                 self._add_io_state(self.io_loop.WRITE)
00351             self._maybe_add_error_listener()
00352         return future
00353 
00354     def set_close_callback(self, callback):
00355         """Call the given callback when the stream is closed.
00356 
00357         This is not necessary for applications that use the `.Future`
00358         interface; all outstanding ``Futures`` will resolve with a
00359         `StreamClosedError` when the stream is closed.
00360         """
00361         self._close_callback = stack_context.wrap(callback)
00362         self._maybe_add_error_listener()
00363 
00364     def close(self, exc_info=False):
00365         """Close this stream.
00366 
00367         If ``exc_info`` is true, set the ``error`` attribute to the current
00368         exception from `sys.exc_info` (or if ``exc_info`` is a tuple,
00369         use that instead of `sys.exc_info`).
00370         """
00371         if not self.closed():
00372             if exc_info:
00373                 if not isinstance(exc_info, tuple):
00374                     exc_info = sys.exc_info()
00375                 if any(exc_info):
00376                     self.error = exc_info[1]
00377             if self._read_until_close:
00378                 if (self._streaming_callback is not None and
00379                         self._read_buffer_size):
00380                     self._run_read_callback(self._read_buffer_size, True)
00381                 self._read_until_close = False
00382                 self._run_read_callback(self._read_buffer_size, False)
00383             if self._state is not None:
00384                 self.io_loop.remove_handler(self.fileno())
00385                 self._state = None
00386             self.close_fd()
00387             self._closed = True
00388         self._maybe_run_close_callback()
00389 
00390     def _maybe_run_close_callback(self):
00391         # If there are pending callbacks, don't run the close callback
00392         # until they're done (see _maybe_add_error_handler)
00393         if self.closed() and self._pending_callbacks == 0:
00394             futures = []
00395             if self._read_future is not None:
00396                 futures.append(self._read_future)
00397                 self._read_future = None
00398             if self._write_future is not None:
00399                 futures.append(self._write_future)
00400                 self._write_future = None
00401             if self._connect_future is not None:
00402                 futures.append(self._connect_future)
00403                 self._connect_future = None
00404             for future in futures:
00405                 if (isinstance(self.error, (socket.error, IOError)) and
00406                         errno_from_exception(self.error) in _ERRNO_CONNRESET):
00407                     # Treat connection resets as closed connections so
00408                     # clients only have to catch one kind of exception
00409                     # to avoid logging.
00410                     future.set_exception(StreamClosedError())
00411                 else:
00412                     future.set_exception(self.error or StreamClosedError())
00413             if self._close_callback is not None:
00414                 cb = self._close_callback
00415                 self._close_callback = None
00416                 self._run_callback(cb)
00417             # Delete any unfinished callbacks to break up reference cycles.
00418             self._read_callback = self._write_callback = None
00419             # Clear the buffers so they can be cleared immediately even
00420             # if the IOStream object is kept alive by a reference cycle.
00421             # TODO: Clear the read buffer too; it currently breaks some tests.
00422             self._write_buffer = None
00423 
00424     def reading(self):
00425         """Returns true if we are currently reading from the stream."""
00426         return self._read_callback is not None or self._read_future is not None
00427 
00428     def writing(self):
00429         """Returns true if we are currently writing to the stream."""
00430         return bool(self._write_buffer)
00431 
00432     def closed(self):
00433         """Returns true if the stream has been closed."""
00434         return self._closed
00435 
00436     def set_nodelay(self, value):
00437         """Sets the no-delay flag for this stream.
00438 
00439         By default, data written to TCP streams may be held for a time
00440         to make the most efficient use of bandwidth (according to
00441         Nagle's algorithm).  The no-delay flag requests that data be
00442         written as soon as possible, even if doing so would consume
00443         additional bandwidth.
00444 
00445         This flag is currently defined only for TCP-based ``IOStreams``.
00446 
00447         .. versionadded:: 3.1
00448         """
00449         pass
00450 
00451     def _handle_events(self, fd, events):
00452         if self.closed():
00453             gen_log.warning("Got events for closed stream %s", fd)
00454             return
00455         try:
00456             if self._connecting:
00457                 # Most IOLoops will report a write failed connect
00458                 # with the WRITE event, but SelectIOLoop reports a
00459                 # READ as well so we must check for connecting before
00460                 # either.
00461                 self._handle_connect()
00462             if self.closed():
00463                 return
00464             if events & self.io_loop.READ:
00465                 self._handle_read()
00466             if self.closed():
00467                 return
00468             if events & self.io_loop.WRITE:
00469                 self._handle_write()
00470             if self.closed():
00471                 return
00472             if events & self.io_loop.ERROR:
00473                 self.error = self.get_fd_error()
00474                 # We may have queued up a user callback in _handle_read or
00475                 # _handle_write, so don't close the IOStream until those
00476                 # callbacks have had a chance to run.
00477                 self.io_loop.add_callback(self.close)
00478                 return
00479             state = self.io_loop.ERROR
00480             if self.reading():
00481                 state |= self.io_loop.READ
00482             if self.writing():
00483                 state |= self.io_loop.WRITE
00484             if state == self.io_loop.ERROR and self._read_buffer_size == 0:
00485                 # If the connection is idle, listen for reads too so
00486                 # we can tell if the connection is closed.  If there is
00487                 # data in the read buffer we won't run the close callback
00488                 # yet anyway, so we don't need to listen in this case.
00489                 state |= self.io_loop.READ
00490             if state != self._state:
00491                 assert self._state is not None, \
00492                     "shouldn't happen: _handle_events without self._state"
00493                 self._state = state
00494                 self.io_loop.update_handler(self.fileno(), self._state)
00495         except UnsatisfiableReadError as e:
00496             gen_log.info("Unsatisfiable read, closing connection: %s" % e)
00497             self.close(exc_info=True)
00498         except Exception:
00499             gen_log.error("Uncaught exception, closing connection.",
00500                           exc_info=True)
00501             self.close(exc_info=True)
00502             raise
00503 
00504     def _run_callback(self, callback, *args):
00505         def wrapper():
00506             self._pending_callbacks -= 1
00507             try:
00508                 return callback(*args)
00509             except Exception:
00510                 app_log.error("Uncaught exception, closing connection.",
00511                               exc_info=True)
00512                 # Close the socket on an uncaught exception from a user callback
00513                 # (It would eventually get closed when the socket object is
00514                 # gc'd, but we don't want to rely on gc happening before we
00515                 # run out of file descriptors)
00516                 self.close(exc_info=True)
00517                 # Re-raise the exception so that IOLoop.handle_callback_exception
00518                 # can see it and log the error
00519                 raise
00520             finally:
00521                 self._maybe_add_error_listener()
00522         # We schedule callbacks to be run on the next IOLoop iteration
00523         # rather than running them directly for several reasons:
00524         # * Prevents unbounded stack growth when a callback calls an
00525         #   IOLoop operation that immediately runs another callback
00526         # * Provides a predictable execution context for e.g.
00527         #   non-reentrant mutexes
00528         # * Ensures that the try/except in wrapper() is run outside
00529         #   of the application's StackContexts
00530         with stack_context.NullContext():
00531             # stack_context was already captured in callback, we don't need to
00532             # capture it again for IOStream's wrapper.  This is especially
00533             # important if the callback was pre-wrapped before entry to
00534             # IOStream (as in HTTPConnection._header_callback), as we could
00535             # capture and leak the wrong context here.
00536             self._pending_callbacks += 1
00537             self.io_loop.add_callback(wrapper)
00538 
00539     def _read_to_buffer_loop(self):
00540         # This method is called from _handle_read and _try_inline_read.
00541         try:
00542             if self._read_bytes is not None:
00543                 target_bytes = self._read_bytes
00544             elif self._read_max_bytes is not None:
00545                 target_bytes = self._read_max_bytes
00546             elif self.reading():
00547                 # For read_until without max_bytes, or
00548                 # read_until_close, read as much as we can before
00549                 # scanning for the delimiter.
00550                 target_bytes = None
00551             else:
00552                 target_bytes = 0
00553             next_find_pos = 0
00554             # Pretend to have a pending callback so that an EOF in
00555             # _read_to_buffer doesn't trigger an immediate close
00556             # callback.  At the end of this method we'll either
00557             # estabilsh a real pending callback via
00558             # _read_from_buffer or run the close callback.
00559             #
00560             # We need two try statements here so that
00561             # pending_callbacks is decremented before the `except`
00562             # clause below (which calls `close` and does need to
00563             # trigger the callback)
00564             self._pending_callbacks += 1
00565             while not self.closed():
00566                 # Read from the socket until we get EWOULDBLOCK or equivalent.
00567                 # SSL sockets do some internal buffering, and if the data is
00568                 # sitting in the SSL object's buffer select() and friends
00569                 # can't see it; the only way to find out if it's there is to
00570                 # try to read it.
00571                 if self._read_to_buffer() == 0:
00572                     break
00573 
00574                 self._run_streaming_callback()
00575 
00576                 # If we've read all the bytes we can use, break out of
00577                 # this loop.  We can't just call read_from_buffer here
00578                 # because of subtle interactions with the
00579                 # pending_callback and error_listener mechanisms.
00580                 #
00581                 # If we've reached target_bytes, we know we're done.
00582                 if (target_bytes is not None and
00583                         self._read_buffer_size >= target_bytes):
00584                     break
00585 
00586                 # Otherwise, we need to call the more expensive find_read_pos.
00587                 # It's inefficient to do this on every read, so instead
00588                 # do it on the first read and whenever the read buffer
00589                 # size has doubled.
00590                 if self._read_buffer_size >= next_find_pos:
00591                     pos = self._find_read_pos()
00592                     if pos is not None:
00593                         return pos
00594                     next_find_pos = self._read_buffer_size * 2
00595             return self._find_read_pos()
00596         finally:
00597             self._pending_callbacks -= 1
00598 
00599     def _handle_read(self):
00600         try:
00601             pos = self._read_to_buffer_loop()
00602         except UnsatisfiableReadError:
00603             raise
00604         except Exception:
00605             gen_log.warning("error on read", exc_info=True)
00606             self.close(exc_info=True)
00607             return
00608         if pos is not None:
00609             self._read_from_buffer(pos)
00610             return
00611         else:
00612             self._maybe_run_close_callback()
00613 
00614     def _set_read_callback(self, callback):
00615         assert self._read_callback is None, "Already reading"
00616         assert self._read_future is None, "Already reading"
00617         if callback is not None:
00618             self._read_callback = stack_context.wrap(callback)
00619         else:
00620             self._read_future = TracebackFuture()
00621         return self._read_future
00622 
00623     def _run_read_callback(self, size, streaming):
00624         if streaming:
00625             callback = self._streaming_callback
00626         else:
00627             callback = self._read_callback
00628             self._read_callback = self._streaming_callback = None
00629         if self._read_future is not None:
00630             assert callback is None
00631             future = self._read_future
00632             self._read_future = None
00633             future.set_result(self._consume(size))
00634         if callback is not None:
00635             assert self._read_future is None
00636             self._run_callback(callback, self._consume(size))
00637         else:
00638             # If we scheduled a callback, we will add the error listener
00639             # afterwards.  If we didn't, we have to do it now.
00640             self._maybe_add_error_listener()
00641 
00642     def _try_inline_read(self):
00643         """Attempt to complete the current read operation from buffered data.
00644 
00645         If the read can be completed without blocking, schedules the
00646         read callback on the next IOLoop iteration; otherwise starts
00647         listening for reads on the socket.
00648         """
00649         # See if we've already got the data from a previous read
00650         self._run_streaming_callback()
00651         pos = self._find_read_pos()
00652         if pos is not None:
00653             self._read_from_buffer(pos)
00654             return
00655         self._check_closed()
00656         try:
00657             pos = self._read_to_buffer_loop()
00658         except Exception:
00659             # If there was an in _read_to_buffer, we called close() already,
00660             # but couldn't run the close callback because of _pending_callbacks.
00661             # Before we escape from this function, run the close callback if
00662             # applicable.
00663             self._maybe_run_close_callback()
00664             raise
00665         if pos is not None:
00666             self._read_from_buffer(pos)
00667             return
00668         # We couldn't satisfy the read inline, so either close the stream
00669         # or listen for new data.
00670         if self.closed():
00671             self._maybe_run_close_callback()
00672         else:
00673             self._add_io_state(ioloop.IOLoop.READ)
00674 
00675     def _read_to_buffer(self):
00676         """Reads from the socket and appends the result to the read buffer.
00677 
00678         Returns the number of bytes read.  Returns 0 if there is nothing
00679         to read (i.e. the read returns EWOULDBLOCK or equivalent).  On
00680         error closes the socket and raises an exception.
00681         """
00682         try:
00683             chunk = self.read_from_fd()
00684         except (socket.error, IOError, OSError) as e:
00685             # ssl.SSLError is a subclass of socket.error
00686             if e.args[0] in _ERRNO_CONNRESET:
00687                 # Treat ECONNRESET as a connection close rather than
00688                 # an error to minimize log spam  (the exception will
00689                 # be available on self.error for apps that care).
00690                 self.close(exc_info=True)
00691                 return
00692             self.close(exc_info=True)
00693             raise
00694         if chunk is None:
00695             return 0
00696         self._read_buffer.append(chunk)
00697         self._read_buffer_size += len(chunk)
00698         if self._read_buffer_size > self.max_buffer_size:
00699             gen_log.error("Reached maximum read buffer size")
00700             self.close()
00701             raise StreamBufferFullError("Reached maximum read buffer size")
00702         return len(chunk)
00703 
00704     def _run_streaming_callback(self):
00705         if self._streaming_callback is not None and self._read_buffer_size:
00706             bytes_to_consume = self._read_buffer_size
00707             if self._read_bytes is not None:
00708                 bytes_to_consume = min(self._read_bytes, bytes_to_consume)
00709                 self._read_bytes -= bytes_to_consume
00710             self._run_read_callback(bytes_to_consume, True)
00711 
00712     def _read_from_buffer(self, pos):
00713         """Attempts to complete the currently-pending read from the buffer.
00714 
00715         The argument is either a position in the read buffer or None,
00716         as returned by _find_read_pos.
00717         """
00718         self._read_bytes = self._read_delimiter = self._read_regex = None
00719         self._read_partial = False
00720         self._run_read_callback(pos, False)
00721 
00722     def _find_read_pos(self):
00723         """Attempts to find a position in the read buffer that satisfies
00724         the currently-pending read.
00725 
00726         Returns a position in the buffer if the current read can be satisfied,
00727         or None if it cannot.
00728         """
00729         if (self._read_bytes is not None and
00730             (self._read_buffer_size >= self._read_bytes or
00731              (self._read_partial and self._read_buffer_size > 0))):
00732             num_bytes = min(self._read_bytes, self._read_buffer_size)
00733             return num_bytes
00734         elif self._read_delimiter is not None:
00735             # Multi-byte delimiters (e.g. '\r\n') may straddle two
00736             # chunks in the read buffer, so we can't easily find them
00737             # without collapsing the buffer.  However, since protocols
00738             # using delimited reads (as opposed to reads of a known
00739             # length) tend to be "line" oriented, the delimiter is likely
00740             # to be in the first few chunks.  Merge the buffer gradually
00741             # since large merges are relatively expensive and get undone in
00742             # _consume().
00743             if self._read_buffer:
00744                 while True:
00745                     loc = self._read_buffer[0].find(self._read_delimiter)
00746                     if loc != -1:
00747                         delimiter_len = len(self._read_delimiter)
00748                         self._check_max_bytes(self._read_delimiter,
00749                                               loc + delimiter_len)
00750                         return loc + delimiter_len
00751                     if len(self._read_buffer) == 1:
00752                         break
00753                     _double_prefix(self._read_buffer)
00754                 self._check_max_bytes(self._read_delimiter,
00755                                       len(self._read_buffer[0]))
00756         elif self._read_regex is not None:
00757             if self._read_buffer:
00758                 while True:
00759                     m = self._read_regex.search(self._read_buffer[0])
00760                     if m is not None:
00761                         self._check_max_bytes(self._read_regex, m.end())
00762                         return m.end()
00763                     if len(self._read_buffer) == 1:
00764                         break
00765                     _double_prefix(self._read_buffer)
00766                 self._check_max_bytes(self._read_regex,
00767                                       len(self._read_buffer[0]))
00768         return None
00769 
00770     def _check_max_bytes(self, delimiter, size):
00771         if (self._read_max_bytes is not None and
00772                 size > self._read_max_bytes):
00773             raise UnsatisfiableReadError(
00774                 "delimiter %r not found within %d bytes" % (
00775                     delimiter, self._read_max_bytes))
00776 
00777     def _handle_write(self):
00778         while self._write_buffer:
00779             try:
00780                 if not self._write_buffer_frozen:
00781                     # On windows, socket.send blows up if given a
00782                     # write buffer that's too large, instead of just
00783                     # returning the number of bytes it was able to
00784                     # process.  Therefore we must not call socket.send
00785                     # with more than 128KB at a time.
00786                     _merge_prefix(self._write_buffer, 128 * 1024)
00787                 num_bytes = self.write_to_fd(self._write_buffer[0])
00788                 if num_bytes == 0:
00789                     # With OpenSSL, if we couldn't write the entire buffer,
00790                     # the very same string object must be used on the
00791                     # next call to send.  Therefore we suppress
00792                     # merging the write buffer after an incomplete send.
00793                     # A cleaner solution would be to set
00794                     # SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER, but this is
00795                     # not yet accessible from python
00796                     # (http://bugs.python.org/issue8240)
00797                     self._write_buffer_frozen = True
00798                     break
00799                 self._write_buffer_frozen = False
00800                 _merge_prefix(self._write_buffer, num_bytes)
00801                 self._write_buffer.popleft()
00802                 self._write_buffer_size -= num_bytes
00803             except (socket.error, IOError, OSError) as e:
00804                 if e.args[0] in _ERRNO_WOULDBLOCK:
00805                     self._write_buffer_frozen = True
00806                     break
00807                 else:
00808                     if e.args[0] not in _ERRNO_CONNRESET:
00809                         # Broken pipe errors are usually caused by connection
00810                         # reset, and its better to not log EPIPE errors to
00811                         # minimize log spam
00812                         gen_log.warning("Write error on %s: %s",
00813                                         self.fileno(), e)
00814                     self.close(exc_info=True)
00815                     return
00816         if not self._write_buffer:
00817             if self._write_callback:
00818                 callback = self._write_callback
00819                 self._write_callback = None
00820                 self._run_callback(callback)
00821             if self._write_future:
00822                 future = self._write_future
00823                 self._write_future = None
00824                 future.set_result(None)
00825 
00826     def _consume(self, loc):
00827         if loc == 0:
00828             return b""
00829         _merge_prefix(self._read_buffer, loc)
00830         self._read_buffer_size -= loc
00831         return self._read_buffer.popleft()
00832 
00833     def _check_closed(self):
00834         if self.closed():
00835             raise StreamClosedError("Stream is closed")
00836 
00837     def _maybe_add_error_listener(self):
00838         # This method is part of an optimization: to detect a connection that
00839         # is closed when we're not actively reading or writing, we must listen
00840         # for read events.  However, it is inefficient to do this when the
00841         # connection is first established because we are going to read or write
00842         # immediately anyway.  Instead, we insert checks at various times to
00843         # see if the connection is idle and add the read listener then.
00844         if self._pending_callbacks != 0:
00845             return
00846         if self._state is None or self._state == ioloop.IOLoop.ERROR:
00847             if self.closed():
00848                 self._maybe_run_close_callback()
00849             elif (self._read_buffer_size == 0 and
00850                   self._close_callback is not None):
00851                 self._add_io_state(ioloop.IOLoop.READ)
00852 
00853     def _add_io_state(self, state):
00854         """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
00855 
00856         Implementation notes: Reads and writes have a fast path and a
00857         slow path.  The fast path reads synchronously from socket
00858         buffers, while the slow path uses `_add_io_state` to schedule
00859         an IOLoop callback.  Note that in both cases, the callback is
00860         run asynchronously with `_run_callback`.
00861 
00862         To detect closed connections, we must have called
00863         `_add_io_state` at some point, but we want to delay this as
00864         much as possible so we don't have to set an `IOLoop.ERROR`
00865         listener that will be overwritten by the next slow-path
00866         operation.  As long as there are callbacks scheduled for
00867         fast-path ops, those callbacks may do more reads.
00868         If a sequence of fast-path ops do not end in a slow-path op,
00869         (e.g. for an @asynchronous long-poll request), we must add
00870         the error handler.  This is done in `_run_callback` and `write`
00871         (since the write callback is optional so we can have a
00872         fast-path write with no `_run_callback`)
00873         """
00874         if self.closed():
00875             # connection has been closed, so there can be no future events
00876             return
00877         if self._state is None:
00878             self._state = ioloop.IOLoop.ERROR | state
00879             with stack_context.NullContext():
00880                 self.io_loop.add_handler(
00881                     self.fileno(), self._handle_events, self._state)
00882         elif not self._state & state:
00883             self._state = self._state | state
00884             self.io_loop.update_handler(self.fileno(), self._state)
00885 
00886 
00887 class IOStream(BaseIOStream):
00888     r"""Socket-based `IOStream` implementation.
00889 
00890     This class supports the read and write methods from `BaseIOStream`
00891     plus a `connect` method.
00892 
00893     The ``socket`` parameter may either be connected or unconnected.
00894     For server operations the socket is the result of calling
00895     `socket.accept <socket.socket.accept>`.  For client operations the
00896     socket is created with `socket.socket`, and may either be
00897     connected before passing it to the `IOStream` or connected with
00898     `IOStream.connect`.
00899 
00900     A very simple (and broken) HTTP client using this class::
00901 
00902         import tornado.ioloop
00903         import tornado.iostream
00904         import socket
00905 
00906         def send_request():
00907             stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
00908             stream.read_until(b"\r\n\r\n", on_headers)
00909 
00910         def on_headers(data):
00911             headers = {}
00912             for line in data.split(b"\r\n"):
00913                parts = line.split(b":")
00914                if len(parts) == 2:
00915                    headers[parts[0].strip()] = parts[1].strip()
00916             stream.read_bytes(int(headers[b"Content-Length"]), on_body)
00917 
00918         def on_body(data):
00919             print data
00920             stream.close()
00921             tornado.ioloop.IOLoop.instance().stop()
00922 
00923         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00924         stream = tornado.iostream.IOStream(s)
00925         stream.connect(("friendfeed.com", 80), send_request)
00926         tornado.ioloop.IOLoop.instance().start()
00927     """
00928     def __init__(self, socket, *args, **kwargs):
00929         self.socket = socket
00930         self.socket.setblocking(False)
00931         super(IOStream, self).__init__(*args, **kwargs)
00932 
00933     def fileno(self):
00934         return self.socket
00935 
00936     def close_fd(self):
00937         self.socket.close()
00938         self.socket = None
00939 
00940     def get_fd_error(self):
00941         errno = self.socket.getsockopt(socket.SOL_SOCKET,
00942                                        socket.SO_ERROR)
00943         return socket.error(errno, os.strerror(errno))
00944 
00945     def read_from_fd(self):
00946         try:
00947             chunk = self.socket.recv(self.read_chunk_size)
00948         except socket.error as e:
00949             if e.args[0] in _ERRNO_WOULDBLOCK:
00950                 return None
00951             else:
00952                 raise
00953         if not chunk:
00954             self.close()
00955             return None
00956         return chunk
00957 
00958     def write_to_fd(self, data):
00959         return self.socket.send(data)
00960 
00961     def connect(self, address, callback=None, server_hostname=None):
00962         """Connects the socket to a remote address without blocking.
00963 
00964         May only be called if the socket passed to the constructor was
00965         not previously connected.  The address parameter is in the
00966         same format as for `socket.connect <socket.socket.connect>` for
00967         the type of socket passed to the IOStream constructor,
00968         e.g. an ``(ip, port)`` tuple.  Hostnames are accepted here,
00969         but will be resolved synchronously and block the IOLoop.
00970         If you have a hostname instead of an IP address, the `.TCPClient`
00971         class is recommended instead of calling this method directly.
00972         `.TCPClient` will do asynchronous DNS resolution and handle
00973         both IPv4 and IPv6.
00974 
00975         If ``callback`` is specified, it will be called with no
00976         arguments when the connection is completed; if not this method
00977         returns a `.Future` (whose result after a successful
00978         connection will be the stream itself).
00979 
00980         If specified, the ``server_hostname`` parameter will be used
00981         in SSL connections for certificate validation (if requested in
00982         the ``ssl_options``) and SNI (if supported; requires
00983         Python 3.2+).
00984 
00985         Note that it is safe to call `IOStream.write
00986         <BaseIOStream.write>` while the connection is pending, in
00987         which case the data will be written as soon as the connection
00988         is ready.  Calling `IOStream` read methods before the socket is
00989         connected works on some platforms but is non-portable.
00990 
00991         .. versionchanged:: 4.0
00992             If no callback is given, returns a `.Future`.
00993 
00994         """
00995         self._connecting = True
00996         if callback is not None:
00997             self._connect_callback = stack_context.wrap(callback)
00998             future = None
00999         else:
01000             future = self._connect_future = TracebackFuture()
01001         try:
01002             self.socket.connect(address)
01003         except socket.error as e:
01004             # In non-blocking mode we expect connect() to raise an
01005             # exception with EINPROGRESS or EWOULDBLOCK.
01006             #
01007             # On freebsd, other errors such as ECONNREFUSED may be
01008             # returned immediately when attempting to connect to
01009             # localhost, so handle them the same way as an error
01010             # reported later in _handle_connect.
01011             if (errno_from_exception(e) not in _ERRNO_INPROGRESS and
01012                     errno_from_exception(e) not in _ERRNO_WOULDBLOCK):
01013                 gen_log.warning("Connect error on fd %s: %s",
01014                                 self.socket.fileno(), e)
01015                 self.close(exc_info=True)
01016                 return future
01017         self._add_io_state(self.io_loop.WRITE)
01018         return future
01019 
01020     def start_tls(self, server_side, ssl_options=None, server_hostname=None):
01021         """Convert this `IOStream` to an `SSLIOStream`.
01022 
01023         This enables protocols that begin in clear-text mode and
01024         switch to SSL after some initial negotiation (such as the
01025         ``STARTTLS`` extension to SMTP and IMAP).
01026 
01027         This method cannot be used if there are outstanding reads
01028         or writes on the stream, or if there is any data in the
01029         IOStream's buffer (data in the operating system's socket
01030         buffer is allowed).  This means it must generally be used
01031         immediately after reading or writing the last clear-text
01032         data.  It can also be used immediately after connecting,
01033         before any reads or writes.
01034 
01035         The ``ssl_options`` argument may be either a dictionary
01036         of options or an `ssl.SSLContext`.  If a ``server_hostname``
01037         is given, it will be used for certificate verification
01038         (as configured in the ``ssl_options``).
01039 
01040         This method returns a `.Future` whose result is the new
01041         `SSLIOStream`.  After this method has been called,
01042         any other operation on the original stream is undefined.
01043 
01044         If a close callback is defined on this stream, it will be
01045         transferred to the new stream.
01046 
01047         .. versionadded:: 4.0
01048         """
01049         if (self._read_callback or self._read_future or
01050                 self._write_callback or self._write_future or
01051                 self._connect_callback or self._connect_future or
01052                 self._pending_callbacks or self._closed or
01053                 self._read_buffer or self._write_buffer):
01054             raise ValueError("IOStream is not idle; cannot convert to SSL")
01055         if ssl_options is None:
01056             ssl_options = {}
01057 
01058         socket = self.socket
01059         self.io_loop.remove_handler(socket)
01060         self.socket = None
01061         socket = ssl_wrap_socket(socket, ssl_options, server_side=server_side,
01062                                  do_handshake_on_connect=False)
01063         orig_close_callback = self._close_callback
01064         self._close_callback = None
01065 
01066         future = TracebackFuture()
01067         ssl_stream = SSLIOStream(socket, ssl_options=ssl_options,
01068                                  io_loop=self.io_loop)
01069         # Wrap the original close callback so we can fail our Future as well.
01070         # If we had an "unwrap" counterpart to this method we would need
01071         # to restore the original callback after our Future resolves
01072         # so that repeated wrap/unwrap calls don't build up layers.
01073         def close_callback():
01074             if not future.done():
01075                 future.set_exception(ssl_stream.error or StreamClosedError())
01076             if orig_close_callback is not None:
01077                 orig_close_callback()
01078         ssl_stream.set_close_callback(close_callback)
01079         ssl_stream._ssl_connect_callback = lambda: future.set_result(ssl_stream)
01080         ssl_stream.max_buffer_size = self.max_buffer_size
01081         ssl_stream.read_chunk_size = self.read_chunk_size
01082         return future
01083 
01084     def _handle_connect(self):
01085         err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
01086         if err != 0:
01087             self.error = socket.error(err, os.strerror(err))
01088             # IOLoop implementations may vary: some of them return
01089             # an error state before the socket becomes writable, so
01090             # in that case a connection failure would be handled by the
01091             # error path in _handle_events instead of here.
01092             if self._connect_future is None:
01093                 gen_log.warning("Connect error on fd %s: %s",
01094                                 self.socket.fileno(), errno.errorcode[err])
01095             self.close()
01096             return
01097         if self._connect_callback is not None:
01098             callback = self._connect_callback
01099             self._connect_callback = None
01100             self._run_callback(callback)
01101         if self._connect_future is not None:
01102             future = self._connect_future
01103             self._connect_future = None
01104             future.set_result(self)
01105         self._connecting = False
01106 
01107     def set_nodelay(self, value):
01108         if (self.socket is not None and
01109                 self.socket.family in (socket.AF_INET, socket.AF_INET6)):
01110             try:
01111                 self.socket.setsockopt(socket.IPPROTO_TCP,
01112                                        socket.TCP_NODELAY, 1 if value else 0)
01113             except socket.error as e:
01114                 # Sometimes setsockopt will fail if the socket is closed
01115                 # at the wrong time.  This can happen with HTTPServer
01116                 # resetting the value to false between requests.
01117                 if e.errno not in (errno.EINVAL, errno.ECONNRESET):
01118                     raise
01119 
01120 
01121 class SSLIOStream(IOStream):
01122     """A utility class to write to and read from a non-blocking SSL socket.
01123 
01124     If the socket passed to the constructor is already connected,
01125     it should be wrapped with::
01126 
01127         ssl.wrap_socket(sock, do_handshake_on_connect=False, **kwargs)
01128 
01129     before constructing the `SSLIOStream`.  Unconnected sockets will be
01130     wrapped when `IOStream.connect` is finished.
01131     """
01132     def __init__(self, *args, **kwargs):
01133         """The ``ssl_options`` keyword argument may either be a dictionary
01134         of keywords arguments for `ssl.wrap_socket`, or an `ssl.SSLContext`
01135         object.
01136         """
01137         self._ssl_options = kwargs.pop('ssl_options', {})
01138         super(SSLIOStream, self).__init__(*args, **kwargs)
01139         self._ssl_accepting = True
01140         self._handshake_reading = False
01141         self._handshake_writing = False
01142         self._ssl_connect_callback = None
01143         self._server_hostname = None
01144 
01145         # If the socket is already connected, attempt to start the handshake.
01146         try:
01147             self.socket.getpeername()
01148         except socket.error:
01149             pass
01150         else:
01151             # Indirectly start the handshake, which will run on the next
01152             # IOLoop iteration and then the real IO state will be set in
01153             # _handle_events.
01154             self._add_io_state(self.io_loop.WRITE)
01155 
01156     def reading(self):
01157         return self._handshake_reading or super(SSLIOStream, self).reading()
01158 
01159     def writing(self):
01160         return self._handshake_writing or super(SSLIOStream, self).writing()
01161 
01162     def _do_ssl_handshake(self):
01163         # Based on code from test_ssl.py in the python stdlib
01164         try:
01165             self._handshake_reading = False
01166             self._handshake_writing = False
01167             self.socket.do_handshake()
01168         except ssl.SSLError as err:
01169             if err.args[0] == ssl.SSL_ERROR_WANT_READ:
01170                 self._handshake_reading = True
01171                 return
01172             elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
01173                 self._handshake_writing = True
01174                 return
01175             elif err.args[0] in (ssl.SSL_ERROR_EOF,
01176                                  ssl.SSL_ERROR_ZERO_RETURN):
01177                 return self.close(exc_info=True)
01178             elif err.args[0] == ssl.SSL_ERROR_SSL:
01179                 try:
01180                     peer = self.socket.getpeername()
01181                 except Exception:
01182                     peer = '(not connected)'
01183                 gen_log.warning("SSL Error on %s %s: %s",
01184                                 self.socket.fileno(), peer, err)
01185                 return self.close(exc_info=True)
01186             raise
01187         except socket.error as err:
01188             if err.args[0] in _ERRNO_CONNRESET:
01189                 return self.close(exc_info=True)
01190         except AttributeError:
01191             # On Linux, if the connection was reset before the call to
01192             # wrap_socket, do_handshake will fail with an
01193             # AttributeError.
01194             return self.close(exc_info=True)
01195         else:
01196             self._ssl_accepting = False
01197             if not self._verify_cert(self.socket.getpeercert()):
01198                 self.close()
01199                 return
01200             if self._ssl_connect_callback is not None:
01201                 callback = self._ssl_connect_callback
01202                 self._ssl_connect_callback = None
01203                 self._run_callback(callback)
01204 
01205     def _verify_cert(self, peercert):
01206         """Returns True if peercert is valid according to the configured
01207         validation mode and hostname.
01208 
01209         The ssl handshake already tested the certificate for a valid
01210         CA signature; the only thing that remains is to check
01211         the hostname.
01212         """
01213         if isinstance(self._ssl_options, dict):
01214             verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
01215         elif isinstance(self._ssl_options, ssl.SSLContext):
01216             verify_mode = self._ssl_options.verify_mode
01217         assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
01218         if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
01219             return True
01220         cert = self.socket.getpeercert()
01221         if cert is None and verify_mode == ssl.CERT_REQUIRED:
01222             gen_log.warning("No SSL certificate given")
01223             return False
01224         try:
01225             ssl_match_hostname(peercert, self._server_hostname)
01226         except SSLCertificateError:
01227             gen_log.warning("Invalid SSL certificate", exc_info=True)
01228             return False
01229         else:
01230             return True
01231 
01232     def _handle_read(self):
01233         if self._ssl_accepting:
01234             self._do_ssl_handshake()
01235             return
01236         super(SSLIOStream, self)._handle_read()
01237 
01238     def _handle_write(self):
01239         if self._ssl_accepting:
01240             self._do_ssl_handshake()
01241             return
01242         super(SSLIOStream, self)._handle_write()
01243 
01244     def connect(self, address, callback=None, server_hostname=None):
01245         # Save the user's callback and run it after the ssl handshake
01246         # has completed.
01247         self._ssl_connect_callback = stack_context.wrap(callback)
01248         self._server_hostname = server_hostname
01249         # Note: Since we don't pass our callback argument along to
01250         # super.connect(), this will always return a Future.
01251         # This is harmless, but a bit less efficient than it could be.
01252         return super(SSLIOStream, self).connect(address, callback=None)
01253 
01254     def _handle_connect(self):
01255         # Call the superclass method to check for errors.
01256         super(SSLIOStream, self)._handle_connect()
01257         if self.closed():
01258             return
01259         # When the connection is complete, wrap the socket for SSL
01260         # traffic.  Note that we do this by overriding _handle_connect
01261         # instead of by passing a callback to super().connect because
01262         # user callbacks are enqueued asynchronously on the IOLoop,
01263         # but since _handle_events calls _handle_connect immediately
01264         # followed by _handle_write we need this to be synchronous.
01265         #
01266         # The IOLoop will get confused if we swap out self.socket while the
01267         # fd is registered, so remove it now and re-register after
01268         # wrap_socket().
01269         self.io_loop.remove_handler(self.socket)
01270         old_state = self._state
01271         self._state = None
01272         self.socket = ssl_wrap_socket(self.socket, self._ssl_options,
01273                                       server_hostname=self._server_hostname,
01274                                       do_handshake_on_connect=False)
01275         self._add_io_state(old_state)
01276 
01277     def read_from_fd(self):
01278         if self._ssl_accepting:
01279             # If the handshake hasn't finished yet, there can't be anything
01280             # to read (attempting to read may or may not raise an exception
01281             # depending on the SSL version)
01282             return None
01283         try:
01284             # SSLSocket objects have both a read() and recv() method,
01285             # while regular sockets only have recv().
01286             # The recv() method blocks (at least in python 2.6) if it is
01287             # called when there is nothing to read, so we have to use
01288             # read() instead.
01289             chunk = self.socket.read(self.read_chunk_size)
01290         except ssl.SSLError as e:
01291             # SSLError is a subclass of socket.error, so this except
01292             # block must come first.
01293             if e.args[0] == ssl.SSL_ERROR_WANT_READ:
01294                 return None
01295             else:
01296                 raise
01297         except socket.error as e:
01298             if e.args[0] in _ERRNO_WOULDBLOCK:
01299                 return None
01300             else:
01301                 raise
01302         if not chunk:
01303             self.close()
01304             return None
01305         return chunk
01306 
01307 
01308 class PipeIOStream(BaseIOStream):
01309     """Pipe-based `IOStream` implementation.
01310 
01311     The constructor takes an integer file descriptor (such as one returned
01312     by `os.pipe`) rather than an open file object.  Pipes are generally
01313     one-way, so a `PipeIOStream` can be used for reading or writing but not
01314     both.
01315     """
01316     def __init__(self, fd, *args, **kwargs):
01317         self.fd = fd
01318         _set_nonblocking(fd)
01319         super(PipeIOStream, self).__init__(*args, **kwargs)
01320 
01321     def fileno(self):
01322         return self.fd
01323 
01324     def close_fd(self):
01325         os.close(self.fd)
01326 
01327     def write_to_fd(self, data):
01328         return os.write(self.fd, data)
01329 
01330     def read_from_fd(self):
01331         try:
01332             chunk = os.read(self.fd, self.read_chunk_size)
01333         except (IOError, OSError) as e:
01334             if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
01335                 return None
01336             elif errno_from_exception(e) == errno.EBADF:
01337                 # If the writing half of a pipe is closed, select will
01338                 # report it as readable but reads will fail with EBADF.
01339                 self.close(exc_info=True)
01340                 return None
01341             else:
01342                 raise
01343         if not chunk:
01344             self.close()
01345             return None
01346         return chunk
01347 
01348 
01349 def _double_prefix(deque):
01350     """Grow by doubling, but don't split the second chunk just because the
01351     first one is small.
01352     """
01353     new_len = max(len(deque[0]) * 2,
01354                   (len(deque[0]) + len(deque[1])))
01355     _merge_prefix(deque, new_len)
01356 
01357 
01358 def _merge_prefix(deque, size):
01359     """Replace the first entries in a deque of strings with a single
01360     string of up to size bytes.
01361 
01362     >>> d = collections.deque(['abc', 'de', 'fghi', 'j'])
01363     >>> _merge_prefix(d, 5); print(d)
01364     deque(['abcde', 'fghi', 'j'])
01365 
01366     Strings will be split as necessary to reach the desired size.
01367     >>> _merge_prefix(d, 7); print(d)
01368     deque(['abcdefg', 'hi', 'j'])
01369 
01370     >>> _merge_prefix(d, 3); print(d)
01371     deque(['abc', 'defg', 'hi', 'j'])
01372 
01373     >>> _merge_prefix(d, 100); print(d)
01374     deque(['abcdefghij'])
01375     """
01376     if len(deque) == 1 and len(deque[0]) <= size:
01377         return
01378     prefix = []
01379     remaining = size
01380     while deque and remaining > 0:
01381         chunk = deque.popleft()
01382         if len(chunk) > remaining:
01383             deque.appendleft(chunk[remaining:])
01384             chunk = chunk[:remaining]
01385         prefix.append(chunk)
01386         remaining -= len(chunk)
01387     # This data structure normally just contains byte strings, but
01388     # the unittest gets messy if it doesn't use the default str() type,
01389     # so do the merge based on the type of data that's actually present.
01390     if prefix:
01391         deque.appendleft(type(prefix[0])().join(prefix))
01392     if not deque:
01393         deque.appendleft(b"")
01394 
01395 
01396 def doctests():
01397     import doctest
01398     return doctest.DocTestSuite()


rosbridge_tools
Author(s): Jonathan Mace
autogenerated on Sat Dec 27 2014 11:25:59