00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 """Client and server implementations of HTTP/1.x.
00018
00019 .. versionadded:: 4.0
00020 """
00021
00022 from __future__ import absolute_import, division, print_function, with_statement
00023
00024 import re
00025
00026 from tornado.concurrent import Future
00027 from tornado.escape import native_str, utf8
00028 from tornado import gen
00029 from tornado import httputil
00030 from tornado import iostream
00031 from tornado.log import gen_log, app_log
00032 from tornado import stack_context
00033 from tornado.util import GzipDecompressor
00034
00035
00036 class _QuietException(Exception):
00037 def __init__(self):
00038 pass
00039
00040 class _ExceptionLoggingContext(object):
00041 """Used with the ``with`` statement when calling delegate methods to
00042 log any exceptions with the given logger. Any exceptions caught are
00043 converted to _QuietException
00044 """
00045 def __init__(self, logger):
00046 self.logger = logger
00047
00048 def __enter__(self):
00049 pass
00050
00051 def __exit__(self, typ, value, tb):
00052 if value is not None:
00053 self.logger.error("Uncaught exception", exc_info=(typ, value, tb))
00054 raise _QuietException
00055
00056 class HTTP1ConnectionParameters(object):
00057 """Parameters for `.HTTP1Connection` and `.HTTP1ServerConnection`.
00058 """
00059 def __init__(self, no_keep_alive=False, chunk_size=None,
00060 max_header_size=None, header_timeout=None, max_body_size=None,
00061 body_timeout=None, decompress=False):
00062 """
00063 :arg bool no_keep_alive: If true, always close the connection after
00064 one request.
00065 :arg int chunk_size: how much data to read into memory at once
00066 :arg int max_header_size: maximum amount of data for HTTP headers
00067 :arg float header_timeout: how long to wait for all headers (seconds)
00068 :arg int max_body_size: maximum amount of data for body
00069 :arg float body_timeout: how long to wait while reading body (seconds)
00070 :arg bool decompress: if true, decode incoming
00071 ``Content-Encoding: gzip``
00072 """
00073 self.no_keep_alive = no_keep_alive
00074 self.chunk_size = chunk_size or 65536
00075 self.max_header_size = max_header_size or 65536
00076 self.header_timeout = header_timeout
00077 self.max_body_size = max_body_size
00078 self.body_timeout = body_timeout
00079 self.decompress = decompress
00080
00081
00082 class HTTP1Connection(httputil.HTTPConnection):
00083 """Implements the HTTP/1.x protocol.
00084
00085 This class can be on its own for clients, or via `HTTP1ServerConnection`
00086 for servers.
00087 """
00088 def __init__(self, stream, is_client, params=None, context=None):
00089 """
00090 :arg stream: an `.IOStream`
00091 :arg bool is_client: client or server
00092 :arg params: a `.HTTP1ConnectionParameters` instance or ``None``
00093 :arg context: an opaque application-defined object that can be accessed
00094 as ``connection.context``.
00095 """
00096 self.is_client = is_client
00097 self.stream = stream
00098 if params is None:
00099 params = HTTP1ConnectionParameters()
00100 self.params = params
00101 self.context = context
00102 self.no_keep_alive = params.no_keep_alive
00103
00104
00105 self._max_body_size = (self.params.max_body_size or
00106 self.stream.max_buffer_size)
00107 self._body_timeout = self.params.body_timeout
00108
00109
00110
00111 self._write_finished = False
00112
00113 self._read_finished = False
00114
00115
00116 self._finish_future = Future()
00117
00118
00119
00120 self._disconnect_on_finish = False
00121 self._clear_callbacks()
00122
00123
00124
00125 self._request_start_line = None
00126 self._response_start_line = None
00127 self._request_headers = None
00128
00129 self._chunking_output = None
00130
00131
00132 self._expected_content_remaining = None
00133
00134 self._pending_write = None
00135
00136 def read_response(self, delegate):
00137 """Read a single HTTP response.
00138
00139 Typical client-mode usage is to write a request using `write_headers`,
00140 `write`, and `finish`, and then call ``read_response``.
00141
00142 :arg delegate: a `.HTTPMessageDelegate`
00143
00144 Returns a `.Future` that resolves to None after the full response has
00145 been read.
00146 """
00147 if self.params.decompress:
00148 delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
00149 return self._read_message(delegate)
00150
00151 @gen.coroutine
00152 def _read_message(self, delegate):
00153 need_delegate_close = False
00154 try:
00155 header_future = self.stream.read_until_regex(
00156 b"\r?\n\r?\n",
00157 max_bytes=self.params.max_header_size)
00158 if self.params.header_timeout is None:
00159 header_data = yield header_future
00160 else:
00161 try:
00162 header_data = yield gen.with_timeout(
00163 self.stream.io_loop.time() + self.params.header_timeout,
00164 header_future,
00165 io_loop=self.stream.io_loop)
00166 except gen.TimeoutError:
00167 self.close()
00168 raise gen.Return(False)
00169 start_line, headers = self._parse_headers(header_data)
00170 if self.is_client:
00171 start_line = httputil.parse_response_start_line(start_line)
00172 self._response_start_line = start_line
00173 else:
00174 start_line = httputil.parse_request_start_line(start_line)
00175 self._request_start_line = start_line
00176 self._request_headers = headers
00177
00178 self._disconnect_on_finish = not self._can_keep_alive(
00179 start_line, headers)
00180 need_delegate_close = True
00181 with _ExceptionLoggingContext(app_log):
00182 header_future = delegate.headers_received(start_line, headers)
00183 if header_future is not None:
00184 yield header_future
00185 if self.stream is None:
00186
00187 need_delegate_close = False
00188 raise gen.Return(False)
00189 skip_body = False
00190 if self.is_client:
00191 if (self._request_start_line is not None and
00192 self._request_start_line.method == 'HEAD'):
00193 skip_body = True
00194 code = start_line.code
00195 if code == 304:
00196
00197
00198
00199 skip_body = True
00200 if code >= 100 and code < 200:
00201
00202
00203 if ('Content-Length' in headers or
00204 'Transfer-Encoding' in headers):
00205 raise httputil.HTTPInputError(
00206 "Response code %d cannot have body" % code)
00207
00208
00209 yield self._read_message(delegate)
00210 else:
00211 if (headers.get("Expect") == "100-continue" and
00212 not self._write_finished):
00213 self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
00214 if not skip_body:
00215 body_future = self._read_body(
00216 start_line.code if self.is_client else 0, headers, delegate)
00217 if body_future is not None:
00218 if self._body_timeout is None:
00219 yield body_future
00220 else:
00221 try:
00222 yield gen.with_timeout(
00223 self.stream.io_loop.time() + self._body_timeout,
00224 body_future, self.stream.io_loop)
00225 except gen.TimeoutError:
00226 gen_log.info("Timeout reading body from %s",
00227 self.context)
00228 self.stream.close()
00229 raise gen.Return(False)
00230 self._read_finished = True
00231 if not self._write_finished or self.is_client:
00232 need_delegate_close = False
00233 with _ExceptionLoggingContext(app_log):
00234 delegate.finish()
00235
00236
00237
00238 if (not self._finish_future.done() and
00239 self.stream is not None and
00240 not self.stream.closed()):
00241 self.stream.set_close_callback(self._on_connection_close)
00242 yield self._finish_future
00243 if self.is_client and self._disconnect_on_finish:
00244 self.close()
00245 if self.stream is None:
00246 raise gen.Return(False)
00247 except httputil.HTTPInputError as e:
00248 gen_log.info("Malformed HTTP message from %s: %s",
00249 self.context, e)
00250 self.close()
00251 raise gen.Return(False)
00252 finally:
00253 if need_delegate_close:
00254 with _ExceptionLoggingContext(app_log):
00255 delegate.on_connection_close()
00256 self._clear_callbacks()
00257 raise gen.Return(True)
00258
00259 def _clear_callbacks(self):
00260 """Clears the callback attributes.
00261
00262 This allows the request handler to be garbage collected more
00263 quickly in CPython by breaking up reference cycles.
00264 """
00265 self._write_callback = None
00266 self._write_future = None
00267 self._close_callback = None
00268 if self.stream is not None:
00269 self.stream.set_close_callback(None)
00270
00271 def set_close_callback(self, callback):
00272 """Sets a callback that will be run when the connection is closed.
00273
00274 .. deprecated:: 4.0
00275 Use `.HTTPMessageDelegate.on_connection_close` instead.
00276 """
00277 self._close_callback = stack_context.wrap(callback)
00278
00279 def _on_connection_close(self):
00280
00281
00282
00283 if self._close_callback is not None:
00284 callback = self._close_callback
00285 self._close_callback = None
00286 callback()
00287 if not self._finish_future.done():
00288 self._finish_future.set_result(None)
00289 self._clear_callbacks()
00290
00291 def close(self):
00292 if self.stream is not None:
00293 self.stream.close()
00294 self._clear_callbacks()
00295 if not self._finish_future.done():
00296 self._finish_future.set_result(None)
00297
00298 def detach(self):
00299 """Take control of the underlying stream.
00300
00301 Returns the underlying `.IOStream` object and stops all further
00302 HTTP processing. May only be called during
00303 `.HTTPMessageDelegate.headers_received`. Intended for implementing
00304 protocols like websockets that tunnel over an HTTP handshake.
00305 """
00306 self._clear_callbacks()
00307 stream = self.stream
00308 self.stream = None
00309 return stream
00310
00311 def set_body_timeout(self, timeout):
00312 """Sets the body timeout for a single request.
00313
00314 Overrides the value from `.HTTP1ConnectionParameters`.
00315 """
00316 self._body_timeout = timeout
00317
00318 def set_max_body_size(self, max_body_size):
00319 """Sets the body size limit for a single request.
00320
00321 Overrides the value from `.HTTP1ConnectionParameters`.
00322 """
00323 self._max_body_size = max_body_size
00324
00325 def write_headers(self, start_line, headers, chunk=None, callback=None):
00326 """Implements `.HTTPConnection.write_headers`."""
00327 if self.is_client:
00328 self._request_start_line = start_line
00329
00330
00331 self._chunking_output = (
00332 start_line.method in ('POST', 'PUT', 'PATCH') and
00333 'Content-Length' not in headers and
00334 'Transfer-Encoding' not in headers)
00335 else:
00336 self._response_start_line = start_line
00337 self._chunking_output = (
00338
00339
00340
00341 self._request_start_line.version == 'HTTP/1.1' and
00342
00343
00344
00345 start_line.code != 304 and
00346
00347 'Content-Length' not in headers and
00348
00349
00350 'Transfer-Encoding' not in headers)
00351
00352 if (self._request_start_line.version == 'HTTP/1.0' and
00353 (self._request_headers.get('Connection', '').lower()
00354 == 'keep-alive')):
00355 headers['Connection'] = 'Keep-Alive'
00356 if self._chunking_output:
00357 headers['Transfer-Encoding'] = 'chunked'
00358 if (not self.is_client and
00359 (self._request_start_line.method == 'HEAD' or
00360 start_line.code == 304)):
00361 self._expected_content_remaining = 0
00362 elif 'Content-Length' in headers:
00363 self._expected_content_remaining = int(headers['Content-Length'])
00364 else:
00365 self._expected_content_remaining = None
00366 lines = [utf8("%s %s %s" % start_line)]
00367 lines.extend([utf8(n) + b": " + utf8(v) for n, v in headers.get_all()])
00368 for line in lines:
00369 if b'\n' in line:
00370 raise ValueError('Newline in header: ' + repr(line))
00371 future = None
00372 if self.stream.closed():
00373 future = self._write_future = Future()
00374 future.set_exception(iostream.StreamClosedError())
00375 else:
00376 if callback is not None:
00377 self._write_callback = stack_context.wrap(callback)
00378 else:
00379 future = self._write_future = Future()
00380 data = b"\r\n".join(lines) + b"\r\n\r\n"
00381 if chunk:
00382 data += self._format_chunk(chunk)
00383 self._pending_write = self.stream.write(data)
00384 self._pending_write.add_done_callback(self._on_write_complete)
00385 return future
00386
00387 def _format_chunk(self, chunk):
00388 if self._expected_content_remaining is not None:
00389 self._expected_content_remaining -= len(chunk)
00390 if self._expected_content_remaining < 0:
00391
00392 self.stream.close()
00393 raise httputil.HTTPOutputError(
00394 "Tried to write more data than Content-Length")
00395 if self._chunking_output and chunk:
00396
00397
00398 return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n"
00399 else:
00400 return chunk
00401
00402 def write(self, chunk, callback=None):
00403 """Implements `.HTTPConnection.write`.
00404
00405 For backwards compatibility is is allowed but deprecated to
00406 skip `write_headers` and instead call `write()` with a
00407 pre-encoded header block.
00408 """
00409 future = None
00410 if self.stream.closed():
00411 future = self._write_future = Future()
00412 self._write_future.set_exception(iostream.StreamClosedError())
00413 else:
00414 if callback is not None:
00415 self._write_callback = stack_context.wrap(callback)
00416 else:
00417 future = self._write_future = Future()
00418 self._pending_write = self.stream.write(self._format_chunk(chunk))
00419 self._pending_write.add_done_callback(self._on_write_complete)
00420 return future
00421
00422 def finish(self):
00423 """Implements `.HTTPConnection.finish`."""
00424 if (self._expected_content_remaining is not None and
00425 self._expected_content_remaining != 0 and
00426 not self.stream.closed()):
00427 self.stream.close()
00428 raise httputil.HTTPOutputError(
00429 "Tried to write %d bytes less than Content-Length" %
00430 self._expected_content_remaining)
00431 if self._chunking_output:
00432 if not self.stream.closed():
00433 self._pending_write = self.stream.write(b"0\r\n\r\n")
00434 self._pending_write.add_done_callback(self._on_write_complete)
00435 self._write_finished = True
00436
00437
00438
00439
00440
00441 if not self._read_finished:
00442 self._disconnect_on_finish = True
00443
00444
00445 self.stream.set_nodelay(True)
00446 if self._pending_write is None:
00447 self._finish_request(None)
00448 else:
00449 self._pending_write.add_done_callback(self._finish_request)
00450
00451 def _on_write_complete(self, future):
00452 if self._write_callback is not None:
00453 callback = self._write_callback
00454 self._write_callback = None
00455 self.stream.io_loop.add_callback(callback)
00456 if self._write_future is not None:
00457 future = self._write_future
00458 self._write_future = None
00459 future.set_result(None)
00460
00461 def _can_keep_alive(self, start_line, headers):
00462 if self.params.no_keep_alive:
00463 return False
00464 connection_header = headers.get("Connection")
00465 if connection_header is not None:
00466 connection_header = connection_header.lower()
00467 if start_line.version == "HTTP/1.1":
00468 return connection_header != "close"
00469 elif ("Content-Length" in headers
00470 or start_line.method in ("HEAD", "GET")):
00471 return connection_header == "keep-alive"
00472 return False
00473
00474 def _finish_request(self, future):
00475 self._clear_callbacks()
00476 if not self.is_client and self._disconnect_on_finish:
00477 self.close()
00478 return
00479
00480
00481 self.stream.set_nodelay(False)
00482 if not self._finish_future.done():
00483 self._finish_future.set_result(None)
00484
00485 def _parse_headers(self, data):
00486 data = native_str(data.decode('latin1'))
00487 eol = data.find("\r\n")
00488 start_line = data[:eol]
00489 try:
00490 headers = httputil.HTTPHeaders.parse(data[eol:])
00491 except ValueError:
00492
00493 raise httputil.HTTPInputError("Malformed HTTP headers: %r" %
00494 data[eol:100])
00495 return start_line, headers
00496
00497 def _read_body(self, code, headers, delegate):
00498 if "Content-Length" in headers:
00499 if "," in headers["Content-Length"]:
00500
00501
00502
00503 pieces = re.split(r',\s*', headers["Content-Length"])
00504 if any(i != pieces[0] for i in pieces):
00505 raise httputil.HTTPInputError(
00506 "Multiple unequal Content-Lengths: %r" %
00507 headers["Content-Length"])
00508 headers["Content-Length"] = pieces[0]
00509 content_length = int(headers["Content-Length"])
00510
00511 if content_length > self._max_body_size:
00512 raise httputil.HTTPInputError("Content-Length too long")
00513 else:
00514 content_length = None
00515
00516 if code == 204:
00517
00518
00519
00520 if ("Transfer-Encoding" in headers or
00521 content_length not in (None, 0)):
00522 raise httputil.HTTPInputError(
00523 "Response with code %d should not have body" % code)
00524 content_length = 0
00525
00526 if content_length is not None:
00527 return self._read_fixed_body(content_length, delegate)
00528 if headers.get("Transfer-Encoding") == "chunked":
00529 return self._read_chunked_body(delegate)
00530 if self.is_client:
00531 return self._read_body_until_close(delegate)
00532 return None
00533
00534 @gen.coroutine
00535 def _read_fixed_body(self, content_length, delegate):
00536 while content_length > 0:
00537 body = yield self.stream.read_bytes(
00538 min(self.params.chunk_size, content_length), partial=True)
00539 content_length -= len(body)
00540 if not self._write_finished or self.is_client:
00541 with _ExceptionLoggingContext(app_log):
00542 yield gen.maybe_future(delegate.data_received(body))
00543
00544 @gen.coroutine
00545 def _read_chunked_body(self, delegate):
00546
00547 total_size = 0
00548 while True:
00549 chunk_len = yield self.stream.read_until(b"\r\n", max_bytes=64)
00550 chunk_len = int(chunk_len.strip(), 16)
00551 if chunk_len == 0:
00552 return
00553 total_size += chunk_len
00554 if total_size > self._max_body_size:
00555 raise httputil.HTTPInputError("chunked body too large")
00556 bytes_to_read = chunk_len
00557 while bytes_to_read:
00558 chunk = yield self.stream.read_bytes(
00559 min(bytes_to_read, self.params.chunk_size), partial=True)
00560 bytes_to_read -= len(chunk)
00561 if not self._write_finished or self.is_client:
00562 with _ExceptionLoggingContext(app_log):
00563 yield gen.maybe_future(delegate.data_received(chunk))
00564
00565 crlf = yield self.stream.read_bytes(2)
00566 assert crlf == b"\r\n"
00567
00568 @gen.coroutine
00569 def _read_body_until_close(self, delegate):
00570 body = yield self.stream.read_until_close()
00571 if not self._write_finished or self.is_client:
00572 with _ExceptionLoggingContext(app_log):
00573 delegate.data_received(body)
00574
00575
00576 class _GzipMessageDelegate(httputil.HTTPMessageDelegate):
00577 """Wraps an `HTTPMessageDelegate` to decode ``Content-Encoding: gzip``.
00578 """
00579 def __init__(self, delegate, chunk_size):
00580 self._delegate = delegate
00581 self._chunk_size = chunk_size
00582 self._decompressor = None
00583
00584 def headers_received(self, start_line, headers):
00585 if headers.get("Content-Encoding") == "gzip":
00586 self._decompressor = GzipDecompressor()
00587
00588
00589
00590 headers.add("X-Consumed-Content-Encoding",
00591 headers["Content-Encoding"])
00592 del headers["Content-Encoding"]
00593 return self._delegate.headers_received(start_line, headers)
00594
00595 @gen.coroutine
00596 def data_received(self, chunk):
00597 if self._decompressor:
00598 compressed_data = chunk
00599 while compressed_data:
00600 decompressed = self._decompressor.decompress(
00601 compressed_data, self._chunk_size)
00602 if decompressed:
00603 yield gen.maybe_future(
00604 self._delegate.data_received(decompressed))
00605 compressed_data = self._decompressor.unconsumed_tail
00606 else:
00607 yield gen.maybe_future(self._delegate.data_received(chunk))
00608
00609 def finish(self):
00610 if self._decompressor is not None:
00611 tail = self._decompressor.flush()
00612 if tail:
00613
00614
00615
00616
00617
00618 self._delegate.data_received(tail)
00619 return self._delegate.finish()
00620
00621 def on_connection_close(self):
00622 return self._delegate.on_connection_close()
00623
00624
00625 class HTTP1ServerConnection(object):
00626 """An HTTP/1.x server."""
00627 def __init__(self, stream, params=None, context=None):
00628 """
00629 :arg stream: an `.IOStream`
00630 :arg params: a `.HTTP1ConnectionParameters` or None
00631 :arg context: an opaque application-defined object that is accessible
00632 as ``connection.context``
00633 """
00634 self.stream = stream
00635 if params is None:
00636 params = HTTP1ConnectionParameters()
00637 self.params = params
00638 self.context = context
00639 self._serving_future = None
00640
00641 @gen.coroutine
00642 def close(self):
00643 """Closes the connection.
00644
00645 Returns a `.Future` that resolves after the serving loop has exited.
00646 """
00647 self.stream.close()
00648
00649
00650 try:
00651 yield self._serving_future
00652 except Exception:
00653 pass
00654
00655 def start_serving(self, delegate):
00656 """Starts serving requests on this connection.
00657
00658 :arg delegate: a `.HTTPServerConnectionDelegate`
00659 """
00660 assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
00661 self._serving_future = self._server_request_loop(delegate)
00662
00663 self.stream.io_loop.add_future(self._serving_future,
00664 lambda f: f.result())
00665
00666 @gen.coroutine
00667 def _server_request_loop(self, delegate):
00668 try:
00669 while True:
00670 conn = HTTP1Connection(self.stream, False,
00671 self.params, self.context)
00672 request_delegate = delegate.start_request(self, conn)
00673 try:
00674 ret = yield conn.read_response(request_delegate)
00675 except (iostream.StreamClosedError,
00676 iostream.UnsatisfiableReadError):
00677 return
00678 except _QuietException:
00679
00680 conn.close()
00681 return
00682 except Exception:
00683 gen_log.error("Uncaught exception", exc_info=True)
00684 conn.close()
00685 return
00686 if not ret:
00687 return
00688 yield gen.moment
00689 finally:
00690 delegate.on_close(self)