00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 """Utility classes to write to and read from non-blocking files and sockets.
00018
00019 Contents:
00020
00021 * `BaseIOStream`: Generic interface for reading and writing.
00022 * `IOStream`: Implementation of BaseIOStream using non-blocking sockets.
00023 * `SSLIOStream`: SSL-aware version of IOStream.
00024 * `PipeIOStream`: Pipe-based IOStream implementation.
00025 """
00026
00027 from __future__ import absolute_import, division, print_function, with_statement
00028
00029 import collections
00030 import errno
00031 import numbers
00032 import os
00033 import socket
00034 import sys
00035 import re
00036
00037 from tornado.concurrent import TracebackFuture
00038 from tornado import ioloop
00039 from tornado.log import gen_log, app_log
00040 from tornado.netutil import ssl_wrap_socket, ssl_match_hostname, SSLCertificateError
00041 from tornado import stack_context
00042 from tornado.util import bytes_type, errno_from_exception
00043
00044 try:
00045 from tornado.platform.posix import _set_nonblocking
00046 except ImportError:
00047 _set_nonblocking = None
00048
00049 try:
00050 import ssl
00051 except ImportError:
00052
00053 ssl = None
00054
00055
00056
00057
00058 _ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)
00059
00060 if hasattr(errno, "WSAEWOULDBLOCK"):
00061 _ERRNO_WOULDBLOCK += (errno.WSAEWOULDBLOCK,)
00062
00063
00064
00065 _ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE,
00066 errno.ETIMEDOUT)
00067
00068 if hasattr(errno, "WSAECONNRESET"):
00069 _ERRNO_CONNRESET += (errno.WSAECONNRESET, errno.WSAECONNABORTED, errno.WSAETIMEDOUT)
00070
00071
00072 _ERRNO_INPROGRESS = (errno.EINPROGRESS,)
00073
00074 if hasattr(errno, "WSAEINPROGRESS"):
00075 _ERRNO_INPROGRESS += (errno.WSAEINPROGRESS,)
00076
00077
00078 class StreamClosedError(IOError):
00079 """Exception raised by `IOStream` methods when the stream is closed.
00080
00081 Note that the close callback is scheduled to run *after* other
00082 callbacks on the stream (to allow for buffered data to be processed),
00083 so you may see this error before you see the close callback.
00084 """
00085 pass
00086
00087
00088 class UnsatisfiableReadError(Exception):
00089 """Exception raised when a read cannot be satisfied.
00090
00091 Raised by ``read_until`` and ``read_until_regex`` with a ``max_bytes``
00092 argument.
00093 """
00094 pass
00095
00096
00097 class StreamBufferFullError(Exception):
00098 """Exception raised by `IOStream` methods when the buffer is full.
00099 """
00100
00101
00102 class BaseIOStream(object):
00103 """A utility class to write to and read from a non-blocking file or socket.
00104
00105 We support a non-blocking ``write()`` and a family of ``read_*()`` methods.
00106 All of the methods take an optional ``callback`` argument and return a
00107 `.Future` only if no callback is given. When the operation completes,
00108 the callback will be run or the `.Future` will resolve with the data
00109 read (or ``None`` for ``write()``). All outstanding ``Futures`` will
00110 resolve with a `StreamClosedError` when the stream is closed; users
00111 of the callback interface will be notified via
00112 `.BaseIOStream.set_close_callback` instead.
00113
00114 When a stream is closed due to an error, the IOStream's ``error``
00115 attribute contains the exception object.
00116
00117 Subclasses must implement `fileno`, `close_fd`, `write_to_fd`,
00118 `read_from_fd`, and optionally `get_fd_error`.
00119 """
00120 def __init__(self, io_loop=None, max_buffer_size=None,
00121 read_chunk_size=None, max_write_buffer_size=None):
00122 """`BaseIOStream` constructor.
00123
00124 :arg io_loop: The `.IOLoop` to use; defaults to `.IOLoop.current`.
00125 :arg max_buffer_size: Maximum amount of incoming data to buffer;
00126 defaults to 100MB.
00127 :arg read_chunk_size: Amount of data to read at one time from the
00128 underlying transport; defaults to 64KB.
00129 :arg max_write_buffer_size: Amount of outgoing data to buffer;
00130 defaults to unlimited.
00131
00132 .. versionchanged:: 4.0
00133 Add the ``max_write_buffer_size`` parameter. Changed default
00134 ``read_chunk_size`` to 64KB.
00135 """
00136 self.io_loop = io_loop or ioloop.IOLoop.current()
00137 self.max_buffer_size = max_buffer_size or 104857600
00138
00139
00140 self.read_chunk_size = min(read_chunk_size or 65536,
00141 self.max_buffer_size // 2)
00142 self.max_write_buffer_size = max_write_buffer_size
00143 self.error = None
00144 self._read_buffer = collections.deque()
00145 self._write_buffer = collections.deque()
00146 self._read_buffer_size = 0
00147 self._write_buffer_size = 0
00148 self._write_buffer_frozen = False
00149 self._read_delimiter = None
00150 self._read_regex = None
00151 self._read_max_bytes = None
00152 self._read_bytes = None
00153 self._read_partial = False
00154 self._read_until_close = False
00155 self._read_callback = None
00156 self._read_future = None
00157 self._streaming_callback = None
00158 self._write_callback = None
00159 self._write_future = None
00160 self._close_callback = None
00161 self._connect_callback = None
00162 self._connect_future = None
00163 self._connecting = False
00164 self._state = None
00165 self._pending_callbacks = 0
00166 self._closed = False
00167
00168 def fileno(self):
00169 """Returns the file descriptor for this stream."""
00170 raise NotImplementedError()
00171
00172 def close_fd(self):
00173 """Closes the file underlying this stream.
00174
00175 ``close_fd`` is called by `BaseIOStream` and should not be called
00176 elsewhere; other users should call `close` instead.
00177 """
00178 raise NotImplementedError()
00179
00180 def write_to_fd(self, data):
00181 """Attempts to write ``data`` to the underlying file.
00182
00183 Returns the number of bytes written.
00184 """
00185 raise NotImplementedError()
00186
00187 def read_from_fd(self):
00188 """Attempts to read from the underlying file.
00189
00190 Returns ``None`` if there was nothing to read (the socket
00191 returned `~errno.EWOULDBLOCK` or equivalent), otherwise
00192 returns the data. When possible, should return no more than
00193 ``self.read_chunk_size`` bytes at a time.
00194 """
00195 raise NotImplementedError()
00196
00197 def get_fd_error(self):
00198 """Returns information about any error on the underlying file.
00199
00200 This method is called after the `.IOLoop` has signaled an error on the
00201 file descriptor, and should return an Exception (such as `socket.error`
00202 with additional information, or None if no such information is
00203 available.
00204 """
00205 return None
00206
00207 def read_until_regex(self, regex, callback=None, max_bytes=None):
00208 """Asynchronously read until we have matched the given regex.
00209
00210 The result includes the data that matches the regex and anything
00211 that came before it. If a callback is given, it will be run
00212 with the data as an argument; if not, this method returns a
00213 `.Future`.
00214
00215 If ``max_bytes`` is not None, the connection will be closed
00216 if more than ``max_bytes`` bytes have been read and the regex is
00217 not satisfied.
00218
00219 .. versionchanged:: 4.0
00220 Added the ``max_bytes`` argument. The ``callback`` argument is
00221 now optional and a `.Future` will be returned if it is omitted.
00222 """
00223 future = self._set_read_callback(callback)
00224 self._read_regex = re.compile(regex)
00225 self._read_max_bytes = max_bytes
00226 try:
00227 self._try_inline_read()
00228 except UnsatisfiableReadError as e:
00229
00230 gen_log.info("Unsatisfiable read, closing connection: %s" % e)
00231 self.close(exc_info=True)
00232 return future
00233 return future
00234
00235 def read_until(self, delimiter, callback=None, max_bytes=None):
00236 """Asynchronously read until we have found the given delimiter.
00237
00238 The result includes all the data read including the delimiter.
00239 If a callback is given, it will be run with the data as an argument;
00240 if not, this method returns a `.Future`.
00241
00242 If ``max_bytes`` is not None, the connection will be closed
00243 if more than ``max_bytes`` bytes have been read and the delimiter
00244 is not found.
00245
00246 .. versionchanged:: 4.0
00247 Added the ``max_bytes`` argument. The ``callback`` argument is
00248 now optional and a `.Future` will be returned if it is omitted.
00249 """
00250 future = self._set_read_callback(callback)
00251 self._read_delimiter = delimiter
00252 self._read_max_bytes = max_bytes
00253 try:
00254 self._try_inline_read()
00255 except UnsatisfiableReadError as e:
00256
00257 gen_log.info("Unsatisfiable read, closing connection: %s" % e)
00258 self.close(exc_info=True)
00259 return future
00260 return future
00261
00262 def read_bytes(self, num_bytes, callback=None, streaming_callback=None,
00263 partial=False):
00264 """Asynchronously read a number of bytes.
00265
00266 If a ``streaming_callback`` is given, it will be called with chunks
00267 of data as they become available, and the final result will be empty.
00268 Otherwise, the result is all the data that was read.
00269 If a callback is given, it will be run with the data as an argument;
00270 if not, this method returns a `.Future`.
00271
00272 If ``partial`` is true, the callback is run as soon as we have
00273 any bytes to return (but never more than ``num_bytes``)
00274
00275 .. versionchanged:: 4.0
00276 Added the ``partial`` argument. The callback argument is now
00277 optional and a `.Future` will be returned if it is omitted.
00278 """
00279 future = self._set_read_callback(callback)
00280 assert isinstance(num_bytes, numbers.Integral)
00281 self._read_bytes = num_bytes
00282 self._read_partial = partial
00283 self._streaming_callback = stack_context.wrap(streaming_callback)
00284 self._try_inline_read()
00285 return future
00286
00287 def read_until_close(self, callback=None, streaming_callback=None):
00288 """Asynchronously reads all data from the socket until it is closed.
00289
00290 If a ``streaming_callback`` is given, it will be called with chunks
00291 of data as they become available, and the final result will be empty.
00292 Otherwise, the result is all the data that was read.
00293 If a callback is given, it will be run with the data as an argument;
00294 if not, this method returns a `.Future`.
00295
00296 .. versionchanged:: 4.0
00297 The callback argument is now optional and a `.Future` will
00298 be returned if it is omitted.
00299 """
00300 future = self._set_read_callback(callback)
00301 self._streaming_callback = stack_context.wrap(streaming_callback)
00302 if self.closed():
00303 if self._streaming_callback is not None:
00304 self._run_read_callback(self._read_buffer_size, True)
00305 self._run_read_callback(self._read_buffer_size, False)
00306 return future
00307 self._read_until_close = True
00308 self._try_inline_read()
00309 return future
00310
00311 def write(self, data, callback=None):
00312 """Asynchronously write the given data to this stream.
00313
00314 If ``callback`` is given, we call it when all of the buffered write
00315 data has been successfully written to the stream. If there was
00316 previously buffered write data and an old write callback, that
00317 callback is simply overwritten with this new callback.
00318
00319 If no ``callback`` is given, this method returns a `.Future` that
00320 resolves (with a result of ``None``) when the write has been
00321 completed. If `write` is called again before that `.Future` has
00322 resolved, the previous future will be orphaned and will never resolve.
00323
00324 .. versionchanged:: 4.0
00325 Now returns a `.Future` if no callback is given.
00326 """
00327 assert isinstance(data, bytes_type)
00328 self._check_closed()
00329
00330
00331 if data:
00332 if (self.max_write_buffer_size is not None and
00333 self._write_buffer_size + len(data) > self.max_write_buffer_size):
00334 raise StreamBufferFullError("Reached maximum read buffer size")
00335
00336
00337
00338 WRITE_BUFFER_CHUNK_SIZE = 128 * 1024
00339 for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE):
00340 self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE])
00341 self._write_buffer_size += len(data)
00342 if callback is not None:
00343 self._write_callback = stack_context.wrap(callback)
00344 future = None
00345 else:
00346 future = self._write_future = TracebackFuture()
00347 if not self._connecting:
00348 self._handle_write()
00349 if self._write_buffer:
00350 self._add_io_state(self.io_loop.WRITE)
00351 self._maybe_add_error_listener()
00352 return future
00353
00354 def set_close_callback(self, callback):
00355 """Call the given callback when the stream is closed.
00356
00357 This is not necessary for applications that use the `.Future`
00358 interface; all outstanding ``Futures`` will resolve with a
00359 `StreamClosedError` when the stream is closed.
00360 """
00361 self._close_callback = stack_context.wrap(callback)
00362 self._maybe_add_error_listener()
00363
00364 def close(self, exc_info=False):
00365 """Close this stream.
00366
00367 If ``exc_info`` is true, set the ``error`` attribute to the current
00368 exception from `sys.exc_info` (or if ``exc_info`` is a tuple,
00369 use that instead of `sys.exc_info`).
00370 """
00371 if not self.closed():
00372 if exc_info:
00373 if not isinstance(exc_info, tuple):
00374 exc_info = sys.exc_info()
00375 if any(exc_info):
00376 self.error = exc_info[1]
00377 if self._read_until_close:
00378 if (self._streaming_callback is not None and
00379 self._read_buffer_size):
00380 self._run_read_callback(self._read_buffer_size, True)
00381 self._read_until_close = False
00382 self._run_read_callback(self._read_buffer_size, False)
00383 if self._state is not None:
00384 self.io_loop.remove_handler(self.fileno())
00385 self._state = None
00386 self.close_fd()
00387 self._closed = True
00388 self._maybe_run_close_callback()
00389
00390 def _maybe_run_close_callback(self):
00391
00392
00393 if self.closed() and self._pending_callbacks == 0:
00394 futures = []
00395 if self._read_future is not None:
00396 futures.append(self._read_future)
00397 self._read_future = None
00398 if self._write_future is not None:
00399 futures.append(self._write_future)
00400 self._write_future = None
00401 if self._connect_future is not None:
00402 futures.append(self._connect_future)
00403 self._connect_future = None
00404 for future in futures:
00405 if (isinstance(self.error, (socket.error, IOError)) and
00406 errno_from_exception(self.error) in _ERRNO_CONNRESET):
00407
00408
00409
00410 future.set_exception(StreamClosedError())
00411 else:
00412 future.set_exception(self.error or StreamClosedError())
00413 if self._close_callback is not None:
00414 cb = self._close_callback
00415 self._close_callback = None
00416 self._run_callback(cb)
00417
00418 self._read_callback = self._write_callback = None
00419
00420
00421
00422 self._write_buffer = None
00423
00424 def reading(self):
00425 """Returns true if we are currently reading from the stream."""
00426 return self._read_callback is not None or self._read_future is not None
00427
00428 def writing(self):
00429 """Returns true if we are currently writing to the stream."""
00430 return bool(self._write_buffer)
00431
00432 def closed(self):
00433 """Returns true if the stream has been closed."""
00434 return self._closed
00435
00436 def set_nodelay(self, value):
00437 """Sets the no-delay flag for this stream.
00438
00439 By default, data written to TCP streams may be held for a time
00440 to make the most efficient use of bandwidth (according to
00441 Nagle's algorithm). The no-delay flag requests that data be
00442 written as soon as possible, even if doing so would consume
00443 additional bandwidth.
00444
00445 This flag is currently defined only for TCP-based ``IOStreams``.
00446
00447 .. versionadded:: 3.1
00448 """
00449 pass
00450
00451 def _handle_events(self, fd, events):
00452 if self.closed():
00453 gen_log.warning("Got events for closed stream %s", fd)
00454 return
00455 try:
00456 if self._connecting:
00457
00458
00459
00460
00461 self._handle_connect()
00462 if self.closed():
00463 return
00464 if events & self.io_loop.READ:
00465 self._handle_read()
00466 if self.closed():
00467 return
00468 if events & self.io_loop.WRITE:
00469 self._handle_write()
00470 if self.closed():
00471 return
00472 if events & self.io_loop.ERROR:
00473 self.error = self.get_fd_error()
00474
00475
00476
00477 self.io_loop.add_callback(self.close)
00478 return
00479 state = self.io_loop.ERROR
00480 if self.reading():
00481 state |= self.io_loop.READ
00482 if self.writing():
00483 state |= self.io_loop.WRITE
00484 if state == self.io_loop.ERROR and self._read_buffer_size == 0:
00485
00486
00487
00488
00489 state |= self.io_loop.READ
00490 if state != self._state:
00491 assert self._state is not None, \
00492 "shouldn't happen: _handle_events without self._state"
00493 self._state = state
00494 self.io_loop.update_handler(self.fileno(), self._state)
00495 except UnsatisfiableReadError as e:
00496 gen_log.info("Unsatisfiable read, closing connection: %s" % e)
00497 self.close(exc_info=True)
00498 except Exception:
00499 gen_log.error("Uncaught exception, closing connection.",
00500 exc_info=True)
00501 self.close(exc_info=True)
00502 raise
00503
00504 def _run_callback(self, callback, *args):
00505 def wrapper():
00506 self._pending_callbacks -= 1
00507 try:
00508 return callback(*args)
00509 except Exception:
00510 app_log.error("Uncaught exception, closing connection.",
00511 exc_info=True)
00512
00513
00514
00515
00516 self.close(exc_info=True)
00517
00518
00519 raise
00520 finally:
00521 self._maybe_add_error_listener()
00522
00523
00524
00525
00526
00527
00528
00529
00530 with stack_context.NullContext():
00531
00532
00533
00534
00535
00536 self._pending_callbacks += 1
00537 self.io_loop.add_callback(wrapper)
00538
00539 def _read_to_buffer_loop(self):
00540
00541 try:
00542 if self._read_bytes is not None:
00543 target_bytes = self._read_bytes
00544 elif self._read_max_bytes is not None:
00545 target_bytes = self._read_max_bytes
00546 elif self.reading():
00547
00548
00549
00550 target_bytes = None
00551 else:
00552 target_bytes = 0
00553 next_find_pos = 0
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564 self._pending_callbacks += 1
00565 while not self.closed():
00566
00567
00568
00569
00570
00571 if self._read_to_buffer() == 0:
00572 break
00573
00574 self._run_streaming_callback()
00575
00576
00577
00578
00579
00580
00581
00582 if (target_bytes is not None and
00583 self._read_buffer_size >= target_bytes):
00584 break
00585
00586
00587
00588
00589
00590 if self._read_buffer_size >= next_find_pos:
00591 pos = self._find_read_pos()
00592 if pos is not None:
00593 return pos
00594 next_find_pos = self._read_buffer_size * 2
00595 return self._find_read_pos()
00596 finally:
00597 self._pending_callbacks -= 1
00598
00599 def _handle_read(self):
00600 try:
00601 pos = self._read_to_buffer_loop()
00602 except UnsatisfiableReadError:
00603 raise
00604 except Exception:
00605 gen_log.warning("error on read", exc_info=True)
00606 self.close(exc_info=True)
00607 return
00608 if pos is not None:
00609 self._read_from_buffer(pos)
00610 return
00611 else:
00612 self._maybe_run_close_callback()
00613
00614 def _set_read_callback(self, callback):
00615 assert self._read_callback is None, "Already reading"
00616 assert self._read_future is None, "Already reading"
00617 if callback is not None:
00618 self._read_callback = stack_context.wrap(callback)
00619 else:
00620 self._read_future = TracebackFuture()
00621 return self._read_future
00622
00623 def _run_read_callback(self, size, streaming):
00624 if streaming:
00625 callback = self._streaming_callback
00626 else:
00627 callback = self._read_callback
00628 self._read_callback = self._streaming_callback = None
00629 if self._read_future is not None:
00630 assert callback is None
00631 future = self._read_future
00632 self._read_future = None
00633 future.set_result(self._consume(size))
00634 if callback is not None:
00635 assert self._read_future is None
00636 self._run_callback(callback, self._consume(size))
00637 else:
00638
00639
00640 self._maybe_add_error_listener()
00641
00642 def _try_inline_read(self):
00643 """Attempt to complete the current read operation from buffered data.
00644
00645 If the read can be completed without blocking, schedules the
00646 read callback on the next IOLoop iteration; otherwise starts
00647 listening for reads on the socket.
00648 """
00649
00650 self._run_streaming_callback()
00651 pos = self._find_read_pos()
00652 if pos is not None:
00653 self._read_from_buffer(pos)
00654 return
00655 self._check_closed()
00656 try:
00657 pos = self._read_to_buffer_loop()
00658 except Exception:
00659
00660
00661
00662
00663 self._maybe_run_close_callback()
00664 raise
00665 if pos is not None:
00666 self._read_from_buffer(pos)
00667 return
00668
00669
00670 if self.closed():
00671 self._maybe_run_close_callback()
00672 else:
00673 self._add_io_state(ioloop.IOLoop.READ)
00674
00675 def _read_to_buffer(self):
00676 """Reads from the socket and appends the result to the read buffer.
00677
00678 Returns the number of bytes read. Returns 0 if there is nothing
00679 to read (i.e. the read returns EWOULDBLOCK or equivalent). On
00680 error closes the socket and raises an exception.
00681 """
00682 try:
00683 chunk = self.read_from_fd()
00684 except (socket.error, IOError, OSError) as e:
00685
00686 if e.args[0] in _ERRNO_CONNRESET:
00687
00688
00689
00690 self.close(exc_info=True)
00691 return
00692 self.close(exc_info=True)
00693 raise
00694 if chunk is None:
00695 return 0
00696 self._read_buffer.append(chunk)
00697 self._read_buffer_size += len(chunk)
00698 if self._read_buffer_size > self.max_buffer_size:
00699 gen_log.error("Reached maximum read buffer size")
00700 self.close()
00701 raise StreamBufferFullError("Reached maximum read buffer size")
00702 return len(chunk)
00703
00704 def _run_streaming_callback(self):
00705 if self._streaming_callback is not None and self._read_buffer_size:
00706 bytes_to_consume = self._read_buffer_size
00707 if self._read_bytes is not None:
00708 bytes_to_consume = min(self._read_bytes, bytes_to_consume)
00709 self._read_bytes -= bytes_to_consume
00710 self._run_read_callback(bytes_to_consume, True)
00711
00712 def _read_from_buffer(self, pos):
00713 """Attempts to complete the currently-pending read from the buffer.
00714
00715 The argument is either a position in the read buffer or None,
00716 as returned by _find_read_pos.
00717 """
00718 self._read_bytes = self._read_delimiter = self._read_regex = None
00719 self._read_partial = False
00720 self._run_read_callback(pos, False)
00721
00722 def _find_read_pos(self):
00723 """Attempts to find a position in the read buffer that satisfies
00724 the currently-pending read.
00725
00726 Returns a position in the buffer if the current read can be satisfied,
00727 or None if it cannot.
00728 """
00729 if (self._read_bytes is not None and
00730 (self._read_buffer_size >= self._read_bytes or
00731 (self._read_partial and self._read_buffer_size > 0))):
00732 num_bytes = min(self._read_bytes, self._read_buffer_size)
00733 return num_bytes
00734 elif self._read_delimiter is not None:
00735
00736
00737
00738
00739
00740
00741
00742
00743 if self._read_buffer:
00744 while True:
00745 loc = self._read_buffer[0].find(self._read_delimiter)
00746 if loc != -1:
00747 delimiter_len = len(self._read_delimiter)
00748 self._check_max_bytes(self._read_delimiter,
00749 loc + delimiter_len)
00750 return loc + delimiter_len
00751 if len(self._read_buffer) == 1:
00752 break
00753 _double_prefix(self._read_buffer)
00754 self._check_max_bytes(self._read_delimiter,
00755 len(self._read_buffer[0]))
00756 elif self._read_regex is not None:
00757 if self._read_buffer:
00758 while True:
00759 m = self._read_regex.search(self._read_buffer[0])
00760 if m is not None:
00761 self._check_max_bytes(self._read_regex, m.end())
00762 return m.end()
00763 if len(self._read_buffer) == 1:
00764 break
00765 _double_prefix(self._read_buffer)
00766 self._check_max_bytes(self._read_regex,
00767 len(self._read_buffer[0]))
00768 return None
00769
00770 def _check_max_bytes(self, delimiter, size):
00771 if (self._read_max_bytes is not None and
00772 size > self._read_max_bytes):
00773 raise UnsatisfiableReadError(
00774 "delimiter %r not found within %d bytes" % (
00775 delimiter, self._read_max_bytes))
00776
00777 def _handle_write(self):
00778 while self._write_buffer:
00779 try:
00780 if not self._write_buffer_frozen:
00781
00782
00783
00784
00785
00786 _merge_prefix(self._write_buffer, 128 * 1024)
00787 num_bytes = self.write_to_fd(self._write_buffer[0])
00788 if num_bytes == 0:
00789
00790
00791
00792
00793
00794
00795
00796
00797 self._write_buffer_frozen = True
00798 break
00799 self._write_buffer_frozen = False
00800 _merge_prefix(self._write_buffer, num_bytes)
00801 self._write_buffer.popleft()
00802 self._write_buffer_size -= num_bytes
00803 except (socket.error, IOError, OSError) as e:
00804 if e.args[0] in _ERRNO_WOULDBLOCK:
00805 self._write_buffer_frozen = True
00806 break
00807 else:
00808 if e.args[0] not in _ERRNO_CONNRESET:
00809
00810
00811
00812 gen_log.warning("Write error on %s: %s",
00813 self.fileno(), e)
00814 self.close(exc_info=True)
00815 return
00816 if not self._write_buffer:
00817 if self._write_callback:
00818 callback = self._write_callback
00819 self._write_callback = None
00820 self._run_callback(callback)
00821 if self._write_future:
00822 future = self._write_future
00823 self._write_future = None
00824 future.set_result(None)
00825
00826 def _consume(self, loc):
00827 if loc == 0:
00828 return b""
00829 _merge_prefix(self._read_buffer, loc)
00830 self._read_buffer_size -= loc
00831 return self._read_buffer.popleft()
00832
00833 def _check_closed(self):
00834 if self.closed():
00835 raise StreamClosedError("Stream is closed")
00836
00837 def _maybe_add_error_listener(self):
00838
00839
00840
00841
00842
00843
00844 if self._pending_callbacks != 0:
00845 return
00846 if self._state is None or self._state == ioloop.IOLoop.ERROR:
00847 if self.closed():
00848 self._maybe_run_close_callback()
00849 elif (self._read_buffer_size == 0 and
00850 self._close_callback is not None):
00851 self._add_io_state(ioloop.IOLoop.READ)
00852
00853 def _add_io_state(self, state):
00854 """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
00855
00856 Implementation notes: Reads and writes have a fast path and a
00857 slow path. The fast path reads synchronously from socket
00858 buffers, while the slow path uses `_add_io_state` to schedule
00859 an IOLoop callback. Note that in both cases, the callback is
00860 run asynchronously with `_run_callback`.
00861
00862 To detect closed connections, we must have called
00863 `_add_io_state` at some point, but we want to delay this as
00864 much as possible so we don't have to set an `IOLoop.ERROR`
00865 listener that will be overwritten by the next slow-path
00866 operation. As long as there are callbacks scheduled for
00867 fast-path ops, those callbacks may do more reads.
00868 If a sequence of fast-path ops do not end in a slow-path op,
00869 (e.g. for an @asynchronous long-poll request), we must add
00870 the error handler. This is done in `_run_callback` and `write`
00871 (since the write callback is optional so we can have a
00872 fast-path write with no `_run_callback`)
00873 """
00874 if self.closed():
00875
00876 return
00877 if self._state is None:
00878 self._state = ioloop.IOLoop.ERROR | state
00879 with stack_context.NullContext():
00880 self.io_loop.add_handler(
00881 self.fileno(), self._handle_events, self._state)
00882 elif not self._state & state:
00883 self._state = self._state | state
00884 self.io_loop.update_handler(self.fileno(), self._state)
00885
00886
00887 class IOStream(BaseIOStream):
00888 r"""Socket-based `IOStream` implementation.
00889
00890 This class supports the read and write methods from `BaseIOStream`
00891 plus a `connect` method.
00892
00893 The ``socket`` parameter may either be connected or unconnected.
00894 For server operations the socket is the result of calling
00895 `socket.accept <socket.socket.accept>`. For client operations the
00896 socket is created with `socket.socket`, and may either be
00897 connected before passing it to the `IOStream` or connected with
00898 `IOStream.connect`.
00899
00900 A very simple (and broken) HTTP client using this class::
00901
00902 import tornado.ioloop
00903 import tornado.iostream
00904 import socket
00905
00906 def send_request():
00907 stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
00908 stream.read_until(b"\r\n\r\n", on_headers)
00909
00910 def on_headers(data):
00911 headers = {}
00912 for line in data.split(b"\r\n"):
00913 parts = line.split(b":")
00914 if len(parts) == 2:
00915 headers[parts[0].strip()] = parts[1].strip()
00916 stream.read_bytes(int(headers[b"Content-Length"]), on_body)
00917
00918 def on_body(data):
00919 print data
00920 stream.close()
00921 tornado.ioloop.IOLoop.instance().stop()
00922
00923 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00924 stream = tornado.iostream.IOStream(s)
00925 stream.connect(("friendfeed.com", 80), send_request)
00926 tornado.ioloop.IOLoop.instance().start()
00927 """
00928 def __init__(self, socket, *args, **kwargs):
00929 self.socket = socket
00930 self.socket.setblocking(False)
00931 super(IOStream, self).__init__(*args, **kwargs)
00932
00933 def fileno(self):
00934 return self.socket
00935
00936 def close_fd(self):
00937 self.socket.close()
00938 self.socket = None
00939
00940 def get_fd_error(self):
00941 errno = self.socket.getsockopt(socket.SOL_SOCKET,
00942 socket.SO_ERROR)
00943 return socket.error(errno, os.strerror(errno))
00944
00945 def read_from_fd(self):
00946 try:
00947 chunk = self.socket.recv(self.read_chunk_size)
00948 except socket.error as e:
00949 if e.args[0] in _ERRNO_WOULDBLOCK:
00950 return None
00951 else:
00952 raise
00953 if not chunk:
00954 self.close()
00955 return None
00956 return chunk
00957
00958 def write_to_fd(self, data):
00959 return self.socket.send(data)
00960
00961 def connect(self, address, callback=None, server_hostname=None):
00962 """Connects the socket to a remote address without blocking.
00963
00964 May only be called if the socket passed to the constructor was
00965 not previously connected. The address parameter is in the
00966 same format as for `socket.connect <socket.socket.connect>` for
00967 the type of socket passed to the IOStream constructor,
00968 e.g. an ``(ip, port)`` tuple. Hostnames are accepted here,
00969 but will be resolved synchronously and block the IOLoop.
00970 If you have a hostname instead of an IP address, the `.TCPClient`
00971 class is recommended instead of calling this method directly.
00972 `.TCPClient` will do asynchronous DNS resolution and handle
00973 both IPv4 and IPv6.
00974
00975 If ``callback`` is specified, it will be called with no
00976 arguments when the connection is completed; if not this method
00977 returns a `.Future` (whose result after a successful
00978 connection will be the stream itself).
00979
00980 If specified, the ``server_hostname`` parameter will be used
00981 in SSL connections for certificate validation (if requested in
00982 the ``ssl_options``) and SNI (if supported; requires
00983 Python 3.2+).
00984
00985 Note that it is safe to call `IOStream.write
00986 <BaseIOStream.write>` while the connection is pending, in
00987 which case the data will be written as soon as the connection
00988 is ready. Calling `IOStream` read methods before the socket is
00989 connected works on some platforms but is non-portable.
00990
00991 .. versionchanged:: 4.0
00992 If no callback is given, returns a `.Future`.
00993
00994 """
00995 self._connecting = True
00996 if callback is not None:
00997 self._connect_callback = stack_context.wrap(callback)
00998 future = None
00999 else:
01000 future = self._connect_future = TracebackFuture()
01001 try:
01002 self.socket.connect(address)
01003 except socket.error as e:
01004
01005
01006
01007
01008
01009
01010
01011 if (errno_from_exception(e) not in _ERRNO_INPROGRESS and
01012 errno_from_exception(e) not in _ERRNO_WOULDBLOCK):
01013 gen_log.warning("Connect error on fd %s: %s",
01014 self.socket.fileno(), e)
01015 self.close(exc_info=True)
01016 return future
01017 self._add_io_state(self.io_loop.WRITE)
01018 return future
01019
01020 def start_tls(self, server_side, ssl_options=None, server_hostname=None):
01021 """Convert this `IOStream` to an `SSLIOStream`.
01022
01023 This enables protocols that begin in clear-text mode and
01024 switch to SSL after some initial negotiation (such as the
01025 ``STARTTLS`` extension to SMTP and IMAP).
01026
01027 This method cannot be used if there are outstanding reads
01028 or writes on the stream, or if there is any data in the
01029 IOStream's buffer (data in the operating system's socket
01030 buffer is allowed). This means it must generally be used
01031 immediately after reading or writing the last clear-text
01032 data. It can also be used immediately after connecting,
01033 before any reads or writes.
01034
01035 The ``ssl_options`` argument may be either a dictionary
01036 of options or an `ssl.SSLContext`. If a ``server_hostname``
01037 is given, it will be used for certificate verification
01038 (as configured in the ``ssl_options``).
01039
01040 This method returns a `.Future` whose result is the new
01041 `SSLIOStream`. After this method has been called,
01042 any other operation on the original stream is undefined.
01043
01044 If a close callback is defined on this stream, it will be
01045 transferred to the new stream.
01046
01047 .. versionadded:: 4.0
01048 """
01049 if (self._read_callback or self._read_future or
01050 self._write_callback or self._write_future or
01051 self._connect_callback or self._connect_future or
01052 self._pending_callbacks or self._closed or
01053 self._read_buffer or self._write_buffer):
01054 raise ValueError("IOStream is not idle; cannot convert to SSL")
01055 if ssl_options is None:
01056 ssl_options = {}
01057
01058 socket = self.socket
01059 self.io_loop.remove_handler(socket)
01060 self.socket = None
01061 socket = ssl_wrap_socket(socket, ssl_options, server_side=server_side,
01062 do_handshake_on_connect=False)
01063 orig_close_callback = self._close_callback
01064 self._close_callback = None
01065
01066 future = TracebackFuture()
01067 ssl_stream = SSLIOStream(socket, ssl_options=ssl_options,
01068 io_loop=self.io_loop)
01069
01070
01071
01072
01073 def close_callback():
01074 if not future.done():
01075 future.set_exception(ssl_stream.error or StreamClosedError())
01076 if orig_close_callback is not None:
01077 orig_close_callback()
01078 ssl_stream.set_close_callback(close_callback)
01079 ssl_stream._ssl_connect_callback = lambda: future.set_result(ssl_stream)
01080 ssl_stream.max_buffer_size = self.max_buffer_size
01081 ssl_stream.read_chunk_size = self.read_chunk_size
01082 return future
01083
01084 def _handle_connect(self):
01085 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
01086 if err != 0:
01087 self.error = socket.error(err, os.strerror(err))
01088
01089
01090
01091
01092 if self._connect_future is None:
01093 gen_log.warning("Connect error on fd %s: %s",
01094 self.socket.fileno(), errno.errorcode[err])
01095 self.close()
01096 return
01097 if self._connect_callback is not None:
01098 callback = self._connect_callback
01099 self._connect_callback = None
01100 self._run_callback(callback)
01101 if self._connect_future is not None:
01102 future = self._connect_future
01103 self._connect_future = None
01104 future.set_result(self)
01105 self._connecting = False
01106
01107 def set_nodelay(self, value):
01108 if (self.socket is not None and
01109 self.socket.family in (socket.AF_INET, socket.AF_INET6)):
01110 try:
01111 self.socket.setsockopt(socket.IPPROTO_TCP,
01112 socket.TCP_NODELAY, 1 if value else 0)
01113 except socket.error as e:
01114
01115
01116
01117 if e.errno not in (errno.EINVAL, errno.ECONNRESET):
01118 raise
01119
01120
01121 class SSLIOStream(IOStream):
01122 """A utility class to write to and read from a non-blocking SSL socket.
01123
01124 If the socket passed to the constructor is already connected,
01125 it should be wrapped with::
01126
01127 ssl.wrap_socket(sock, do_handshake_on_connect=False, **kwargs)
01128
01129 before constructing the `SSLIOStream`. Unconnected sockets will be
01130 wrapped when `IOStream.connect` is finished.
01131 """
01132 def __init__(self, *args, **kwargs):
01133 """The ``ssl_options`` keyword argument may either be a dictionary
01134 of keywords arguments for `ssl.wrap_socket`, or an `ssl.SSLContext`
01135 object.
01136 """
01137 self._ssl_options = kwargs.pop('ssl_options', {})
01138 super(SSLIOStream, self).__init__(*args, **kwargs)
01139 self._ssl_accepting = True
01140 self._handshake_reading = False
01141 self._handshake_writing = False
01142 self._ssl_connect_callback = None
01143 self._server_hostname = None
01144
01145
01146 try:
01147 self.socket.getpeername()
01148 except socket.error:
01149 pass
01150 else:
01151
01152
01153
01154 self._add_io_state(self.io_loop.WRITE)
01155
01156 def reading(self):
01157 return self._handshake_reading or super(SSLIOStream, self).reading()
01158
01159 def writing(self):
01160 return self._handshake_writing or super(SSLIOStream, self).writing()
01161
01162 def _do_ssl_handshake(self):
01163
01164 try:
01165 self._handshake_reading = False
01166 self._handshake_writing = False
01167 self.socket.do_handshake()
01168 except ssl.SSLError as err:
01169 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
01170 self._handshake_reading = True
01171 return
01172 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
01173 self._handshake_writing = True
01174 return
01175 elif err.args[0] in (ssl.SSL_ERROR_EOF,
01176 ssl.SSL_ERROR_ZERO_RETURN):
01177 return self.close(exc_info=True)
01178 elif err.args[0] == ssl.SSL_ERROR_SSL:
01179 try:
01180 peer = self.socket.getpeername()
01181 except Exception:
01182 peer = '(not connected)'
01183 gen_log.warning("SSL Error on %s %s: %s",
01184 self.socket.fileno(), peer, err)
01185 return self.close(exc_info=True)
01186 raise
01187 except socket.error as err:
01188 if err.args[0] in _ERRNO_CONNRESET:
01189 return self.close(exc_info=True)
01190 except AttributeError:
01191
01192
01193
01194 return self.close(exc_info=True)
01195 else:
01196 self._ssl_accepting = False
01197 if not self._verify_cert(self.socket.getpeercert()):
01198 self.close()
01199 return
01200 if self._ssl_connect_callback is not None:
01201 callback = self._ssl_connect_callback
01202 self._ssl_connect_callback = None
01203 self._run_callback(callback)
01204
01205 def _verify_cert(self, peercert):
01206 """Returns True if peercert is valid according to the configured
01207 validation mode and hostname.
01208
01209 The ssl handshake already tested the certificate for a valid
01210 CA signature; the only thing that remains is to check
01211 the hostname.
01212 """
01213 if isinstance(self._ssl_options, dict):
01214 verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
01215 elif isinstance(self._ssl_options, ssl.SSLContext):
01216 verify_mode = self._ssl_options.verify_mode
01217 assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
01218 if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
01219 return True
01220 cert = self.socket.getpeercert()
01221 if cert is None and verify_mode == ssl.CERT_REQUIRED:
01222 gen_log.warning("No SSL certificate given")
01223 return False
01224 try:
01225 ssl_match_hostname(peercert, self._server_hostname)
01226 except SSLCertificateError:
01227 gen_log.warning("Invalid SSL certificate", exc_info=True)
01228 return False
01229 else:
01230 return True
01231
01232 def _handle_read(self):
01233 if self._ssl_accepting:
01234 self._do_ssl_handshake()
01235 return
01236 super(SSLIOStream, self)._handle_read()
01237
01238 def _handle_write(self):
01239 if self._ssl_accepting:
01240 self._do_ssl_handshake()
01241 return
01242 super(SSLIOStream, self)._handle_write()
01243
01244 def connect(self, address, callback=None, server_hostname=None):
01245
01246
01247 self._ssl_connect_callback = stack_context.wrap(callback)
01248 self._server_hostname = server_hostname
01249
01250
01251
01252 return super(SSLIOStream, self).connect(address, callback=None)
01253
01254 def _handle_connect(self):
01255
01256 super(SSLIOStream, self)._handle_connect()
01257 if self.closed():
01258 return
01259
01260
01261
01262
01263
01264
01265
01266
01267
01268
01269 self.io_loop.remove_handler(self.socket)
01270 old_state = self._state
01271 self._state = None
01272 self.socket = ssl_wrap_socket(self.socket, self._ssl_options,
01273 server_hostname=self._server_hostname,
01274 do_handshake_on_connect=False)
01275 self._add_io_state(old_state)
01276
01277 def read_from_fd(self):
01278 if self._ssl_accepting:
01279
01280
01281
01282 return None
01283 try:
01284
01285
01286
01287
01288
01289 chunk = self.socket.read(self.read_chunk_size)
01290 except ssl.SSLError as e:
01291
01292
01293 if e.args[0] == ssl.SSL_ERROR_WANT_READ:
01294 return None
01295 else:
01296 raise
01297 except socket.error as e:
01298 if e.args[0] in _ERRNO_WOULDBLOCK:
01299 return None
01300 else:
01301 raise
01302 if not chunk:
01303 self.close()
01304 return None
01305 return chunk
01306
01307
01308 class PipeIOStream(BaseIOStream):
01309 """Pipe-based `IOStream` implementation.
01310
01311 The constructor takes an integer file descriptor (such as one returned
01312 by `os.pipe`) rather than an open file object. Pipes are generally
01313 one-way, so a `PipeIOStream` can be used for reading or writing but not
01314 both.
01315 """
01316 def __init__(self, fd, *args, **kwargs):
01317 self.fd = fd
01318 _set_nonblocking(fd)
01319 super(PipeIOStream, self).__init__(*args, **kwargs)
01320
01321 def fileno(self):
01322 return self.fd
01323
01324 def close_fd(self):
01325 os.close(self.fd)
01326
01327 def write_to_fd(self, data):
01328 return os.write(self.fd, data)
01329
01330 def read_from_fd(self):
01331 try:
01332 chunk = os.read(self.fd, self.read_chunk_size)
01333 except (IOError, OSError) as e:
01334 if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
01335 return None
01336 elif errno_from_exception(e) == errno.EBADF:
01337
01338
01339 self.close(exc_info=True)
01340 return None
01341 else:
01342 raise
01343 if not chunk:
01344 self.close()
01345 return None
01346 return chunk
01347
01348
01349 def _double_prefix(deque):
01350 """Grow by doubling, but don't split the second chunk just because the
01351 first one is small.
01352 """
01353 new_len = max(len(deque[0]) * 2,
01354 (len(deque[0]) + len(deque[1])))
01355 _merge_prefix(deque, new_len)
01356
01357
01358 def _merge_prefix(deque, size):
01359 """Replace the first entries in a deque of strings with a single
01360 string of up to size bytes.
01361
01362 >>> d = collections.deque(['abc', 'de', 'fghi', 'j'])
01363 >>> _merge_prefix(d, 5); print(d)
01364 deque(['abcde', 'fghi', 'j'])
01365
01366 Strings will be split as necessary to reach the desired size.
01367 >>> _merge_prefix(d, 7); print(d)
01368 deque(['abcdefg', 'hi', 'j'])
01369
01370 >>> _merge_prefix(d, 3); print(d)
01371 deque(['abc', 'defg', 'hi', 'j'])
01372
01373 >>> _merge_prefix(d, 100); print(d)
01374 deque(['abcdefghij'])
01375 """
01376 if len(deque) == 1 and len(deque[0]) <= size:
01377 return
01378 prefix = []
01379 remaining = size
01380 while deque and remaining > 0:
01381 chunk = deque.popleft()
01382 if len(chunk) > remaining:
01383 deque.appendleft(chunk[remaining:])
01384 chunk = chunk[:remaining]
01385 prefix.append(chunk)
01386 remaining -= len(chunk)
01387
01388
01389
01390 if prefix:
01391 deque.appendleft(type(prefix[0])().join(prefix))
01392 if not deque:
01393 deque.appendleft(b"")
01394
01395
01396 def doctests():
01397 import doctest
01398 return doctest.DocTestSuite()