00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 """A utility class to write to and read from a non-blocking socket."""
00018
00019 from __future__ import absolute_import, division, with_statement
00020
00021 import collections
00022 import errno
00023 import logging
00024 import os
00025 import socket
00026 import sys
00027 import re
00028
00029 from tornado import ioloop
00030 from tornado import stack_context
00031 from tornado.util import b, bytes_type
00032
00033 try:
00034 import ssl
00035 except ImportError:
00036 ssl = None
00037
00038
00039 class IOStream(object):
00040 r"""A utility class to write to and read from a non-blocking socket.
00041
00042 We support a non-blocking ``write()`` and a family of ``read_*()`` methods.
00043 All of the methods take callbacks (since writing and reading are
00044 non-blocking and asynchronous).
00045
00046 The socket parameter may either be connected or unconnected. For
00047 server operations the socket is the result of calling socket.accept().
00048 For client operations the socket is created with socket.socket(),
00049 and may either be connected before passing it to the IOStream or
00050 connected with IOStream.connect.
00051
00052 When a stream is closed due to an error, the IOStream's `error`
00053 attribute contains the exception object.
00054
00055 A very simple (and broken) HTTP client using this class::
00056
00057 from tornado import ioloop
00058 from tornado import iostream
00059 import socket
00060
00061 def send_request():
00062 stream.write("GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
00063 stream.read_until("\r\n\r\n", on_headers)
00064
00065 def on_headers(data):
00066 headers = {}
00067 for line in data.split("\r\n"):
00068 parts = line.split(":")
00069 if len(parts) == 2:
00070 headers[parts[0].strip()] = parts[1].strip()
00071 stream.read_bytes(int(headers["Content-Length"]), on_body)
00072
00073 def on_body(data):
00074 print data
00075 stream.close()
00076 ioloop.IOLoop.instance().stop()
00077
00078 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
00079 stream = iostream.IOStream(s)
00080 stream.connect(("friendfeed.com", 80), send_request)
00081 ioloop.IOLoop.instance().start()
00082
00083 """
00084 def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
00085 read_chunk_size=4096):
00086 self.socket = socket
00087 self.socket.setblocking(False)
00088 self.io_loop = io_loop or ioloop.IOLoop.instance()
00089 self.max_buffer_size = max_buffer_size
00090 self.read_chunk_size = read_chunk_size
00091 self.error = None
00092 self._read_buffer = collections.deque()
00093 self._write_buffer = collections.deque()
00094 self._read_buffer_size = 0
00095 self._write_buffer_frozen = False
00096 self._read_delimiter = None
00097 self._read_regex = None
00098 self._read_bytes = None
00099 self._read_until_close = False
00100 self._read_callback = None
00101 self._streaming_callback = None
00102 self._write_callback = None
00103 self._close_callback = None
00104 self._connect_callback = None
00105 self._connecting = False
00106 self._state = None
00107 self._pending_callbacks = 0
00108
00109 def connect(self, address, callback=None):
00110 """Connects the socket to a remote address without blocking.
00111
00112 May only be called if the socket passed to the constructor was
00113 not previously connected. The address parameter is in the
00114 same format as for socket.connect, i.e. a (host, port) tuple.
00115 If callback is specified, it will be called when the
00116 connection is completed.
00117
00118 Note that it is safe to call IOStream.write while the
00119 connection is pending, in which case the data will be written
00120 as soon as the connection is ready. Calling IOStream read
00121 methods before the socket is connected works on some platforms
00122 but is non-portable.
00123 """
00124 self._connecting = True
00125 try:
00126 self.socket.connect(address)
00127 except socket.error, e:
00128
00129
00130
00131
00132
00133
00134
00135 if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK):
00136 logging.warning("Connect error on fd %d: %s",
00137 self.socket.fileno(), e)
00138 self.close()
00139 return
00140 self._connect_callback = stack_context.wrap(callback)
00141 self._add_io_state(self.io_loop.WRITE)
00142
00143 def read_until_regex(self, regex, callback):
00144 """Call callback when we read the given regex pattern."""
00145 self._set_read_callback(callback)
00146 self._read_regex = re.compile(regex)
00147 self._try_inline_read()
00148
00149 def read_until(self, delimiter, callback):
00150 """Call callback when we read the given delimiter."""
00151 self._set_read_callback(callback)
00152 self._read_delimiter = delimiter
00153 self._try_inline_read()
00154
00155 def read_bytes(self, num_bytes, callback, streaming_callback=None):
00156 """Call callback when we read the given number of bytes.
00157
00158 If a ``streaming_callback`` is given, it will be called with chunks
00159 of data as they become available, and the argument to the final
00160 ``callback`` will be empty.
00161 """
00162 self._set_read_callback(callback)
00163 assert isinstance(num_bytes, (int, long))
00164 self._read_bytes = num_bytes
00165 self._streaming_callback = stack_context.wrap(streaming_callback)
00166 self._try_inline_read()
00167
00168 def read_until_close(self, callback, streaming_callback=None):
00169 """Reads all data from the socket until it is closed.
00170
00171 If a ``streaming_callback`` is given, it will be called with chunks
00172 of data as they become available, and the argument to the final
00173 ``callback`` will be empty.
00174
00175 Subject to ``max_buffer_size`` limit from `IOStream` constructor if
00176 a ``streaming_callback`` is not used.
00177 """
00178 self._set_read_callback(callback)
00179 if self.closed():
00180 self._run_callback(callback, self._consume(self._read_buffer_size))
00181 self._read_callback = None
00182 return
00183 self._read_until_close = True
00184 self._streaming_callback = stack_context.wrap(streaming_callback)
00185 self._add_io_state(self.io_loop.READ)
00186
00187 def write(self, data, callback=None):
00188 """Write the given data to this stream.
00189
00190 If callback is given, we call it when all of the buffered write
00191 data has been successfully written to the stream. If there was
00192 previously buffered write data and an old write callback, that
00193 callback is simply overwritten with this new callback.
00194 """
00195 assert isinstance(data, bytes_type)
00196 self._check_closed()
00197
00198
00199 if data:
00200
00201
00202
00203 WRITE_BUFFER_CHUNK_SIZE = 128 * 1024
00204 if len(data) > WRITE_BUFFER_CHUNK_SIZE:
00205 for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE):
00206 self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE])
00207 else:
00208 self._write_buffer.append(data)
00209 self._write_callback = stack_context.wrap(callback)
00210 self._handle_write()
00211 if self._write_buffer:
00212 self._add_io_state(self.io_loop.WRITE)
00213 self._maybe_add_error_listener()
00214
00215 def set_close_callback(self, callback):
00216 """Call the given callback when the stream is closed."""
00217 self._close_callback = stack_context.wrap(callback)
00218
00219 def close(self):
00220 """Close this stream."""
00221 if self.socket is not None:
00222 if any(sys.exc_info()):
00223 self.error = sys.exc_info()[1]
00224 if self._read_until_close:
00225 callback = self._read_callback
00226 self._read_callback = None
00227 self._read_until_close = False
00228 self._run_callback(callback,
00229 self._consume(self._read_buffer_size))
00230 if self._state is not None:
00231 self.io_loop.remove_handler(self.socket.fileno())
00232 self._state = None
00233 self.socket.close()
00234 self.socket = None
00235 self._maybe_run_close_callback()
00236
00237 def _maybe_run_close_callback(self):
00238 if (self.socket is None and self._close_callback and
00239 self._pending_callbacks == 0):
00240
00241
00242 cb = self._close_callback
00243 self._close_callback = None
00244 self._run_callback(cb)
00245
00246 def reading(self):
00247 """Returns true if we are currently reading from the stream."""
00248 return self._read_callback is not None
00249
00250 def writing(self):
00251 """Returns true if we are currently writing to the stream."""
00252 return bool(self._write_buffer)
00253
00254 def closed(self):
00255 """Returns true if the stream has been closed."""
00256 return self.socket is None
00257
00258 def _handle_events(self, fd, events):
00259 if not self.socket:
00260 logging.warning("Got events for closed stream %d", fd)
00261 return
00262 try:
00263 if events & self.io_loop.READ:
00264 self._handle_read()
00265 if not self.socket:
00266 return
00267 if events & self.io_loop.WRITE:
00268 if self._connecting:
00269 self._handle_connect()
00270 self._handle_write()
00271 if not self.socket:
00272 return
00273 if events & self.io_loop.ERROR:
00274 errno = self.socket.getsockopt(socket.SOL_SOCKET,
00275 socket.SO_ERROR)
00276 self.error = socket.error(errno, os.strerror(errno))
00277
00278
00279
00280 self.io_loop.add_callback(self.close)
00281 return
00282 state = self.io_loop.ERROR
00283 if self.reading():
00284 state |= self.io_loop.READ
00285 if self.writing():
00286 state |= self.io_loop.WRITE
00287 if state == self.io_loop.ERROR:
00288 state |= self.io_loop.READ
00289 if state != self._state:
00290 assert self._state is not None, \
00291 "shouldn't happen: _handle_events without self._state"
00292 self._state = state
00293 self.io_loop.update_handler(self.socket.fileno(), self._state)
00294 except Exception:
00295 logging.error("Uncaught exception, closing connection.",
00296 exc_info=True)
00297 self.close()
00298 raise
00299
00300 def _run_callback(self, callback, *args):
00301 def wrapper():
00302 self._pending_callbacks -= 1
00303 try:
00304 callback(*args)
00305 except Exception:
00306 logging.error("Uncaught exception, closing connection.",
00307 exc_info=True)
00308
00309
00310
00311
00312 self.close()
00313
00314
00315 raise
00316 self._maybe_add_error_listener()
00317
00318
00319
00320
00321
00322
00323
00324
00325 with stack_context.NullContext():
00326
00327
00328
00329
00330
00331 self._pending_callbacks += 1
00332 self.io_loop.add_callback(wrapper)
00333
00334 def _handle_read(self):
00335 try:
00336 try:
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347 self._pending_callbacks += 1
00348 while True:
00349
00350
00351
00352
00353
00354 if self._read_to_buffer() == 0:
00355 break
00356 finally:
00357 self._pending_callbacks -= 1
00358 except Exception:
00359 logging.warning("error on read", exc_info=True)
00360 self.close()
00361 return
00362 if self._read_from_buffer():
00363 return
00364 else:
00365 self._maybe_run_close_callback()
00366
00367 def _set_read_callback(self, callback):
00368 assert not self._read_callback, "Already reading"
00369 self._read_callback = stack_context.wrap(callback)
00370
00371 def _try_inline_read(self):
00372 """Attempt to complete the current read operation from buffered data.
00373
00374 If the read can be completed without blocking, schedules the
00375 read callback on the next IOLoop iteration; otherwise starts
00376 listening for reads on the socket.
00377 """
00378
00379 if self._read_from_buffer():
00380 return
00381 self._check_closed()
00382 try:
00383
00384 self._pending_callbacks += 1
00385 while True:
00386 if self._read_to_buffer() == 0:
00387 break
00388 self._check_closed()
00389 finally:
00390 self._pending_callbacks -= 1
00391 if self._read_from_buffer():
00392 return
00393 self._add_io_state(self.io_loop.READ)
00394
00395 def _read_from_socket(self):
00396 """Attempts to read from the socket.
00397
00398 Returns the data read or None if there is nothing to read.
00399 May be overridden in subclasses.
00400 """
00401 try:
00402 chunk = self.socket.recv(self.read_chunk_size)
00403 except socket.error, e:
00404 if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
00405 return None
00406 else:
00407 raise
00408 if not chunk:
00409 self.close()
00410 return None
00411 return chunk
00412
00413 def _read_to_buffer(self):
00414 """Reads from the socket and appends the result to the read buffer.
00415
00416 Returns the number of bytes read. Returns 0 if there is nothing
00417 to read (i.e. the read returns EWOULDBLOCK or equivalent). On
00418 error closes the socket and raises an exception.
00419 """
00420 try:
00421 chunk = self._read_from_socket()
00422 except socket.error, e:
00423
00424 logging.warning("Read error on %d: %s",
00425 self.socket.fileno(), e)
00426 self.close()
00427 raise
00428 if chunk is None:
00429 return 0
00430 self._read_buffer.append(chunk)
00431 self._read_buffer_size += len(chunk)
00432 if self._read_buffer_size >= self.max_buffer_size:
00433 logging.error("Reached maximum read buffer size")
00434 self.close()
00435 raise IOError("Reached maximum read buffer size")
00436 return len(chunk)
00437
00438 def _read_from_buffer(self):
00439 """Attempts to complete the currently-pending read from the buffer.
00440
00441 Returns True if the read was completed.
00442 """
00443 if self._streaming_callback is not None and self._read_buffer_size:
00444 bytes_to_consume = self._read_buffer_size
00445 if self._read_bytes is not None:
00446 bytes_to_consume = min(self._read_bytes, bytes_to_consume)
00447 self._read_bytes -= bytes_to_consume
00448 self._run_callback(self._streaming_callback,
00449 self._consume(bytes_to_consume))
00450 if self._read_bytes is not None and self._read_buffer_size >= self._read_bytes:
00451 num_bytes = self._read_bytes
00452 callback = self._read_callback
00453 self._read_callback = None
00454 self._streaming_callback = None
00455 self._read_bytes = None
00456 self._run_callback(callback, self._consume(num_bytes))
00457 return True
00458 elif self._read_delimiter is not None:
00459
00460
00461
00462
00463
00464
00465
00466
00467 if self._read_buffer:
00468 while True:
00469 loc = self._read_buffer[0].find(self._read_delimiter)
00470 if loc != -1:
00471 callback = self._read_callback
00472 delimiter_len = len(self._read_delimiter)
00473 self._read_callback = None
00474 self._streaming_callback = None
00475 self._read_delimiter = None
00476 self._run_callback(callback,
00477 self._consume(loc + delimiter_len))
00478 return True
00479 if len(self._read_buffer) == 1:
00480 break
00481 _double_prefix(self._read_buffer)
00482 elif self._read_regex is not None:
00483 if self._read_buffer:
00484 while True:
00485 m = self._read_regex.search(self._read_buffer[0])
00486 if m is not None:
00487 callback = self._read_callback
00488 self._read_callback = None
00489 self._streaming_callback = None
00490 self._read_regex = None
00491 self._run_callback(callback, self._consume(m.end()))
00492 return True
00493 if len(self._read_buffer) == 1:
00494 break
00495 _double_prefix(self._read_buffer)
00496 return False
00497
00498 def _handle_connect(self):
00499 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
00500 if err != 0:
00501 self.error = socket.error(err, os.strerror(err))
00502
00503
00504
00505
00506 logging.warning("Connect error on fd %d: %s",
00507 self.socket.fileno(), errno.errorcode[err])
00508 self.close()
00509 return
00510 if self._connect_callback is not None:
00511 callback = self._connect_callback
00512 self._connect_callback = None
00513 self._run_callback(callback)
00514 self._connecting = False
00515
00516 def _handle_write(self):
00517 while self._write_buffer:
00518 try:
00519 if not self._write_buffer_frozen:
00520
00521
00522
00523
00524
00525 _merge_prefix(self._write_buffer, 128 * 1024)
00526 num_bytes = self.socket.send(self._write_buffer[0])
00527 if num_bytes == 0:
00528
00529
00530
00531
00532
00533
00534
00535
00536 self._write_buffer_frozen = True
00537 break
00538 self._write_buffer_frozen = False
00539 _merge_prefix(self._write_buffer, num_bytes)
00540 self._write_buffer.popleft()
00541 except socket.error, e:
00542 if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
00543 self._write_buffer_frozen = True
00544 break
00545 else:
00546 logging.warning("Write error on %d: %s",
00547 self.socket.fileno(), e)
00548 self.close()
00549 return
00550 if not self._write_buffer and self._write_callback:
00551 callback = self._write_callback
00552 self._write_callback = None
00553 self._run_callback(callback)
00554
00555 def _consume(self, loc):
00556 if loc == 0:
00557 return b("")
00558 _merge_prefix(self._read_buffer, loc)
00559 self._read_buffer_size -= loc
00560 return self._read_buffer.popleft()
00561
00562 def _check_closed(self):
00563 if not self.socket:
00564 raise IOError("Stream is closed")
00565
00566 def _maybe_add_error_listener(self):
00567 if self._state is None and self._pending_callbacks == 0:
00568 if self.socket is None:
00569 self._maybe_run_close_callback()
00570 else:
00571 self._add_io_state(ioloop.IOLoop.READ)
00572
00573 def _add_io_state(self, state):
00574 """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
00575
00576 Implementation notes: Reads and writes have a fast path and a
00577 slow path. The fast path reads synchronously from socket
00578 buffers, while the slow path uses `_add_io_state` to schedule
00579 an IOLoop callback. Note that in both cases, the callback is
00580 run asynchronously with `_run_callback`.
00581
00582 To detect closed connections, we must have called
00583 `_add_io_state` at some point, but we want to delay this as
00584 much as possible so we don't have to set an `IOLoop.ERROR`
00585 listener that will be overwritten by the next slow-path
00586 operation. As long as there are callbacks scheduled for
00587 fast-path ops, those callbacks may do more reads.
00588 If a sequence of fast-path ops do not end in a slow-path op,
00589 (e.g. for an @asynchronous long-poll request), we must add
00590 the error handler. This is done in `_run_callback` and `write`
00591 (since the write callback is optional so we can have a
00592 fast-path write with no `_run_callback`)
00593 """
00594 if self.socket is None:
00595
00596 return
00597 if self._state is None:
00598 self._state = ioloop.IOLoop.ERROR | state
00599 with stack_context.NullContext():
00600 self.io_loop.add_handler(
00601 self.socket.fileno(), self._handle_events, self._state)
00602 elif not self._state & state:
00603 self._state = self._state | state
00604 self.io_loop.update_handler(self.socket.fileno(), self._state)
00605
00606
00607 class SSLIOStream(IOStream):
00608 """A utility class to write to and read from a non-blocking SSL socket.
00609
00610 If the socket passed to the constructor is already connected,
00611 it should be wrapped with::
00612
00613 ssl.wrap_socket(sock, do_handshake_on_connect=False, **kwargs)
00614
00615 before constructing the SSLIOStream. Unconnected sockets will be
00616 wrapped when IOStream.connect is finished.
00617 """
00618 def __init__(self, *args, **kwargs):
00619 """Creates an SSLIOStream.
00620
00621 If a dictionary is provided as keyword argument ssl_options,
00622 it will be used as additional keyword arguments to ssl.wrap_socket.
00623 """
00624 self._ssl_options = kwargs.pop('ssl_options', {})
00625 super(SSLIOStream, self).__init__(*args, **kwargs)
00626 self._ssl_accepting = True
00627 self._handshake_reading = False
00628 self._handshake_writing = False
00629
00630 def reading(self):
00631 return self._handshake_reading or super(SSLIOStream, self).reading()
00632
00633 def writing(self):
00634 return self._handshake_writing or super(SSLIOStream, self).writing()
00635
00636 def _do_ssl_handshake(self):
00637
00638 try:
00639 self._handshake_reading = False
00640 self._handshake_writing = False
00641 self.socket.do_handshake()
00642 except ssl.SSLError, err:
00643 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
00644 self._handshake_reading = True
00645 return
00646 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
00647 self._handshake_writing = True
00648 return
00649 elif err.args[0] in (ssl.SSL_ERROR_EOF,
00650 ssl.SSL_ERROR_ZERO_RETURN):
00651 return self.close()
00652 elif err.args[0] == ssl.SSL_ERROR_SSL:
00653 logging.warning("SSL Error on %d: %s", self.socket.fileno(), err)
00654 return self.close()
00655 raise
00656 except socket.error, err:
00657 if err.args[0] == errno.ECONNABORTED:
00658 return self.close()
00659 else:
00660 self._ssl_accepting = False
00661 super(SSLIOStream, self)._handle_connect()
00662
00663 def _handle_read(self):
00664 if self._ssl_accepting:
00665 self._do_ssl_handshake()
00666 return
00667 super(SSLIOStream, self)._handle_read()
00668
00669 def _handle_write(self):
00670 if self._ssl_accepting:
00671 self._do_ssl_handshake()
00672 return
00673 super(SSLIOStream, self)._handle_write()
00674
00675 def _handle_connect(self):
00676 self.socket = ssl.wrap_socket(self.socket,
00677 do_handshake_on_connect=False,
00678 **self._ssl_options)
00679
00680
00681
00682
00683
00684 def _read_from_socket(self):
00685 if self._ssl_accepting:
00686
00687
00688
00689 return None
00690 try:
00691
00692
00693
00694
00695
00696 chunk = self.socket.read(self.read_chunk_size)
00697 except ssl.SSLError, e:
00698
00699
00700 if e.args[0] == ssl.SSL_ERROR_WANT_READ:
00701 return None
00702 else:
00703 raise
00704 except socket.error, e:
00705 if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
00706 return None
00707 else:
00708 raise
00709 if not chunk:
00710 self.close()
00711 return None
00712 return chunk
00713
00714
00715 def _double_prefix(deque):
00716 """Grow by doubling, but don't split the second chunk just because the
00717 first one is small.
00718 """
00719 new_len = max(len(deque[0]) * 2,
00720 (len(deque[0]) + len(deque[1])))
00721 _merge_prefix(deque, new_len)
00722
00723
00724 def _merge_prefix(deque, size):
00725 """Replace the first entries in a deque of strings with a single
00726 string of up to size bytes.
00727
00728 >>> d = collections.deque(['abc', 'de', 'fghi', 'j'])
00729 >>> _merge_prefix(d, 5); print d
00730 deque(['abcde', 'fghi', 'j'])
00731
00732 Strings will be split as necessary to reach the desired size.
00733 >>> _merge_prefix(d, 7); print d
00734 deque(['abcdefg', 'hi', 'j'])
00735
00736 >>> _merge_prefix(d, 3); print d
00737 deque(['abc', 'defg', 'hi', 'j'])
00738
00739 >>> _merge_prefix(d, 100); print d
00740 deque(['abcdefghij'])
00741 """
00742 if len(deque) == 1 and len(deque[0]) <= size:
00743 return
00744 prefix = []
00745 remaining = size
00746 while deque and remaining > 0:
00747 chunk = deque.popleft()
00748 if len(chunk) > remaining:
00749 deque.appendleft(chunk[remaining:])
00750 chunk = chunk[:remaining]
00751 prefix.append(chunk)
00752 remaining -= len(chunk)
00753
00754
00755
00756 if prefix:
00757 deque.appendleft(type(prefix[0])().join(prefix))
00758 if not deque:
00759 deque.appendleft(b(""))
00760
00761
00762 def doctests():
00763 import doctest
00764 return doctest.DocTestSuite()