00001 """Server-side implementation of the WebSocket protocol.
00002
00003 `WebSockets <http://dev.w3.org/html5/websockets/>`_ allow for bidirectional
00004 communication between the browser and server.
00005
00006 .. warning::
00007
00008 The WebSocket protocol was recently finalized as `RFC 6455
00009 <http://tools.ietf.org/html/rfc6455>`_ and is not yet supported in
00010 all browsers. Refer to http://caniuse.com/websockets for details
00011 on compatibility. In addition, during development the protocol
00012 went through several incompatible versions, and some browsers only
00013 support older versions. By default this module only supports the
00014 latest version of the protocol, but optional support for an older
00015 version (known as "draft 76" or "hixie-76") can be enabled by
00016 overriding `WebSocketHandler.allow_draft76` (see that method's
00017 documentation for caveats).
00018 """
00019
00020 from __future__ import absolute_import, division, with_statement
00021
00022
00023 import array
00024 import functools
00025 import hashlib
00026 import logging
00027 import struct
00028 import time
00029 import base64
00030 import tornado.escape
00031 import tornado.web
00032
00033 from tornado.util import bytes_type, b
00034
00035
00036 class WebSocketHandler(tornado.web.RequestHandler):
00037 """Subclass this class to create a basic WebSocket handler.
00038
00039 Override on_message to handle incoming messages. You can also override
00040 open and on_close to handle opened and closed connections.
00041
00042 See http://dev.w3.org/html5/websockets/ for details on the
00043 JavaScript interface. The protocol is specified at
00044 http://tools.ietf.org/html/rfc6455.
00045
00046 Here is an example Web Socket handler that echos back all received messages
00047 back to the client::
00048
00049 class EchoWebSocket(websocket.WebSocketHandler):
00050 def open(self):
00051 print "WebSocket opened"
00052
00053 def on_message(self, message):
00054 self.write_message(u"You said: " + message)
00055
00056 def on_close(self):
00057 print "WebSocket closed"
00058
00059 Web Sockets are not standard HTTP connections. The "handshake" is HTTP,
00060 but after the handshake, the protocol is message-based. Consequently,
00061 most of the Tornado HTTP facilities are not available in handlers of this
00062 type. The only communication methods available to you are write_message()
00063 and close(). Likewise, your request handler class should
00064 implement open() method rather than get() or post().
00065
00066 If you map the handler above to "/websocket" in your application, you can
00067 invoke it in JavaScript with::
00068
00069 var ws = new WebSocket("ws://localhost:8888/websocket");
00070 ws.onopen = function() {
00071 ws.send("Hello, world");
00072 };
00073 ws.onmessage = function (evt) {
00074 alert(evt.data);
00075 };
00076
00077 This script pops up an alert box that says "You said: Hello, world".
00078 """
00079 def __init__(self, application, request, **kwargs):
00080 tornado.web.RequestHandler.__init__(self, application, request,
00081 **kwargs)
00082 self.stream = request.connection.stream
00083 self.ws_connection = None
00084
00085 def _execute(self, transforms, *args, **kwargs):
00086 self.open_args = args
00087 self.open_kwargs = kwargs
00088
00089
00090 if self.request.method != 'GET':
00091 self.stream.write(tornado.escape.utf8(
00092 "HTTP/1.1 405 Method Not Allowed\r\n\r\n"
00093 ))
00094 self.stream.close()
00095 return
00096
00097
00098 if self.request.headers.get("Upgrade", "").lower() != 'websocket':
00099 self.stream.write(tornado.escape.utf8(
00100 "HTTP/1.1 400 Bad Request\r\n\r\n"
00101 "Can \"Upgrade\" only to \"WebSocket\"."
00102 ))
00103 self.stream.close()
00104 return
00105
00106
00107
00108 headers = self.request.headers
00109 connection = map(lambda s: s.strip().lower(), headers.get("Connection", "").split(","))
00110 if 'upgrade' not in connection:
00111 self.stream.write(tornado.escape.utf8(
00112 "HTTP/1.1 400 Bad Request\r\n\r\n"
00113 "\"Connection\" must be \"Upgrade\"."
00114 ))
00115 self.stream.close()
00116 return
00117
00118
00119
00120
00121 if self.request.headers.get("Sec-WebSocket-Version") in ("7", "8", "13"):
00122 self.ws_connection = WebSocketProtocol13(self)
00123 self.ws_connection.accept_connection()
00124 elif (self.allow_draft76() and
00125 "Sec-WebSocket-Version" not in self.request.headers):
00126 self.ws_connection = WebSocketProtocol76(self)
00127 self.ws_connection.accept_connection()
00128 else:
00129 self.stream.write(tornado.escape.utf8(
00130 "HTTP/1.1 426 Upgrade Required\r\n"
00131 "Sec-WebSocket-Version: 8\r\n\r\n"))
00132 self.stream.close()
00133
00134 def write_message(self, message, binary=False):
00135 """Sends the given message to the client of this Web Socket.
00136
00137 The message may be either a string or a dict (which will be
00138 encoded as json). If the ``binary`` argument is false, the
00139 message will be sent as utf8; in binary mode any byte string
00140 is allowed.
00141 """
00142 if isinstance(message, dict):
00143 message = tornado.escape.json_encode(message)
00144 self.ws_connection.write_message(message, binary=binary)
00145
00146 def select_subprotocol(self, subprotocols):
00147 """Invoked when a new WebSocket requests specific subprotocols.
00148
00149 ``subprotocols`` is a list of strings identifying the
00150 subprotocols proposed by the client. This method may be
00151 overridden to return one of those strings to select it, or
00152 ``None`` to not select a subprotocol. Failure to select a
00153 subprotocol does not automatically abort the connection,
00154 although clients may close the connection if none of their
00155 proposed subprotocols was selected.
00156 """
00157 return None
00158
00159 def open(self):
00160 """Invoked when a new WebSocket is opened.
00161
00162 The arguments to `open` are extracted from the `tornado.web.URLSpec`
00163 regular expression, just like the arguments to
00164 `tornado.web.RequestHandler.get`.
00165 """
00166 pass
00167
00168 def on_message(self, message):
00169 """Handle incoming messages on the WebSocket
00170
00171 This method must be overridden.
00172 """
00173 raise NotImplementedError
00174
00175 def on_close(self):
00176 """Invoked when the WebSocket is closed."""
00177 pass
00178
00179 def close(self):
00180 """Closes this Web Socket.
00181
00182 Once the close handshake is successful the socket will be closed.
00183 """
00184 self.ws_connection.close()
00185
00186 def allow_draft76(self):
00187 """Override to enable support for the older "draft76" protocol.
00188
00189 The draft76 version of the websocket protocol is disabled by
00190 default due to security concerns, but it can be enabled by
00191 overriding this method to return True.
00192
00193 Connections using the draft76 protocol do not support the
00194 ``binary=True`` flag to `write_message`.
00195
00196 Support for the draft76 protocol is deprecated and will be
00197 removed in a future version of Tornado.
00198 """
00199 return False
00200
00201 def get_websocket_scheme(self):
00202 """Return the url scheme used for this request, either "ws" or "wss".
00203
00204 This is normally decided by HTTPServer, but applications
00205 may wish to override this if they are using an SSL proxy
00206 that does not provide the X-Scheme header as understood
00207 by HTTPServer.
00208
00209 Note that this is only used by the draft76 protocol.
00210 """
00211 return "wss" if self.request.protocol == "https" else "ws"
00212
00213 def async_callback(self, callback, *args, **kwargs):
00214 """Wrap callbacks with this if they are used on asynchronous requests.
00215
00216 Catches exceptions properly and closes this WebSocket if an exception
00217 is uncaught. (Note that this is usually unnecessary thanks to
00218 `tornado.stack_context`)
00219 """
00220 return self.ws_connection.async_callback(callback, *args, **kwargs)
00221
00222 def _not_supported(self, *args, **kwargs):
00223 raise Exception("Method not supported for Web Sockets")
00224
00225 def on_connection_close(self):
00226 if self.ws_connection:
00227 self.ws_connection.on_connection_close()
00228 self.ws_connection = None
00229 self.on_close()
00230
00231
00232 for method in ["write", "redirect", "set_header", "send_error", "set_cookie",
00233 "set_status", "flush", "finish"]:
00234 setattr(WebSocketHandler, method, WebSocketHandler._not_supported)
00235
00236
00237 class WebSocketProtocol(object):
00238 """Base class for WebSocket protocol versions.
00239 """
00240 def __init__(self, handler):
00241 self.handler = handler
00242 self.request = handler.request
00243 self.stream = handler.stream
00244 self.client_terminated = False
00245 self.server_terminated = False
00246
00247 def async_callback(self, callback, *args, **kwargs):
00248 """Wrap callbacks with this if they are used on asynchronous requests.
00249
00250 Catches exceptions properly and closes this WebSocket if an exception
00251 is uncaught.
00252 """
00253 if args or kwargs:
00254 callback = functools.partial(callback, *args, **kwargs)
00255
00256 def wrapper(*args, **kwargs):
00257 try:
00258 return callback(*args, **kwargs)
00259 except Exception:
00260 logging.error("Uncaught exception in %s",
00261 self.request.path, exc_info=True)
00262 self._abort()
00263 return wrapper
00264
00265 def on_connection_close(self):
00266 self._abort()
00267
00268 def _abort(self):
00269 """Instantly aborts the WebSocket connection by closing the socket"""
00270 self.client_terminated = True
00271 self.server_terminated = True
00272 self.stream.close()
00273 self.close()
00274
00275
00276 class WebSocketProtocol76(WebSocketProtocol):
00277 """Implementation of the WebSockets protocol, version hixie-76.
00278
00279 This class provides basic functionality to process WebSockets requests as
00280 specified in
00281 http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76
00282 """
00283 def __init__(self, handler):
00284 WebSocketProtocol.__init__(self, handler)
00285 self.challenge = None
00286 self._waiting = None
00287
00288 def accept_connection(self):
00289 try:
00290 self._handle_websocket_headers()
00291 except ValueError:
00292 logging.debug("Malformed WebSocket request received")
00293 self._abort()
00294 return
00295
00296 scheme = self.handler.get_websocket_scheme()
00297
00298
00299 subprotocol_header = ''
00300 subprotocol = self.request.headers.get("Sec-WebSocket-Protocol", None)
00301 if subprotocol:
00302 selected = self.handler.select_subprotocol([subprotocol])
00303 if selected:
00304 assert selected == subprotocol
00305 subprotocol_header = "Sec-WebSocket-Protocol: %s\r\n" % selected
00306
00307
00308
00309
00310
00311 self.stream.write(tornado.escape.utf8(
00312 "HTTP/1.1 101 WebSocket Protocol Handshake\r\n"
00313 "Upgrade: WebSocket\r\n"
00314 "Connection: Upgrade\r\n"
00315 "Server: TornadoServer/%(version)s\r\n"
00316 "Sec-WebSocket-Origin: %(origin)s\r\n"
00317 "Sec-WebSocket-Location: %(scheme)s://%(host)s%(uri)s\r\n"
00318 "%(subprotocol)s"
00319 "\r\n" % (dict(
00320 version=tornado.version,
00321 origin=self.request.headers["Origin"],
00322 scheme=scheme,
00323 host=self.request.host,
00324 uri=self.request.uri,
00325 subprotocol=subprotocol_header))))
00326 self.stream.read_bytes(8, self._handle_challenge)
00327
00328 def challenge_response(self, challenge):
00329 """Generates the challenge response that's needed in the handshake
00330
00331 The challenge parameter should be the raw bytes as sent from the
00332 client.
00333 """
00334 key_1 = self.request.headers.get("Sec-Websocket-Key1")
00335 key_2 = self.request.headers.get("Sec-Websocket-Key2")
00336 try:
00337 part_1 = self._calculate_part(key_1)
00338 part_2 = self._calculate_part(key_2)
00339 except ValueError:
00340 raise ValueError("Invalid Keys/Challenge")
00341 return self._generate_challenge_response(part_1, part_2, challenge)
00342
00343 def _handle_challenge(self, challenge):
00344 try:
00345 challenge_response = self.challenge_response(challenge)
00346 except ValueError:
00347 logging.debug("Malformed key data in WebSocket request")
00348 self._abort()
00349 return
00350 self._write_response(challenge_response)
00351
00352 def _write_response(self, challenge):
00353 self.stream.write(challenge)
00354 self.async_callback(self.handler.open)(*self.handler.open_args, **self.handler.open_kwargs)
00355 self._receive_message()
00356
00357 def _handle_websocket_headers(self):
00358 """Verifies all invariant- and required headers
00359
00360 If a header is missing or have an incorrect value ValueError will be
00361 raised
00362 """
00363 fields = ("Origin", "Host", "Sec-Websocket-Key1",
00364 "Sec-Websocket-Key2")
00365 if not all(map(lambda f: self.request.headers.get(f), fields)):
00366 raise ValueError("Missing/Invalid WebSocket headers")
00367
00368 def _calculate_part(self, key):
00369 """Processes the key headers and calculates their key value.
00370
00371 Raises ValueError when feed invalid key."""
00372 number = int(''.join(c for c in key if c.isdigit()))
00373 spaces = len([c for c in key if c.isspace()])
00374 try:
00375 key_number = number // spaces
00376 except (ValueError, ZeroDivisionError):
00377 raise ValueError
00378 return struct.pack(">I", key_number)
00379
00380 def _generate_challenge_response(self, part_1, part_2, part_3):
00381 m = hashlib.md5()
00382 m.update(part_1)
00383 m.update(part_2)
00384 m.update(part_3)
00385 return m.digest()
00386
00387 def _receive_message(self):
00388 self.stream.read_bytes(1, self._on_frame_type)
00389
00390 def _on_frame_type(self, byte):
00391 frame_type = ord(byte)
00392 if frame_type == 0x00:
00393 self.stream.read_until(b("\xff"), self._on_end_delimiter)
00394 elif frame_type == 0xff:
00395 self.stream.read_bytes(1, self._on_length_indicator)
00396 else:
00397 self._abort()
00398
00399 def _on_end_delimiter(self, frame):
00400 if not self.client_terminated:
00401 self.async_callback(self.handler.on_message)(
00402 frame[:-1].decode("utf-8", "replace"))
00403 if not self.client_terminated:
00404 self._receive_message()
00405
00406 def _on_length_indicator(self, byte):
00407 if ord(byte) != 0x00:
00408 self._abort()
00409 return
00410 self.client_terminated = True
00411 self.close()
00412
00413 def write_message(self, message, binary=False):
00414 """Sends the given message to the client of this Web Socket."""
00415 if binary:
00416 raise ValueError(
00417 "Binary messages not supported by this version of websockets")
00418 if isinstance(message, unicode):
00419 message = message.encode("utf-8")
00420 assert isinstance(message, bytes_type)
00421 self.stream.write(b("\x00") + message + b("\xff"))
00422
00423 def close(self):
00424 """Closes the WebSocket connection."""
00425 if not self.server_terminated:
00426 if not self.stream.closed():
00427 self.stream.write("\xff\x00")
00428 self.server_terminated = True
00429 if self.client_terminated:
00430 if self._waiting is not None:
00431 self.stream.io_loop.remove_timeout(self._waiting)
00432 self._waiting = None
00433 self.stream.close()
00434 elif self._waiting is None:
00435 self._waiting = self.stream.io_loop.add_timeout(
00436 time.time() + 5, self._abort)
00437
00438
00439 class WebSocketProtocol13(WebSocketProtocol):
00440 """Implementation of the WebSocket protocol from RFC 6455.
00441
00442 This class supports versions 7 and 8 of the protocol in addition to the
00443 final version 13.
00444 """
00445 def __init__(self, handler):
00446 WebSocketProtocol.__init__(self, handler)
00447 self._final_frame = False
00448 self._frame_opcode = None
00449 self._frame_mask = None
00450 self._frame_length = None
00451 self._fragmented_message_buffer = None
00452 self._fragmented_message_opcode = None
00453 self._waiting = None
00454
00455 def accept_connection(self):
00456 try:
00457 self._handle_websocket_headers()
00458 self._accept_connection()
00459 except ValueError:
00460 logging.debug("Malformed WebSocket request received")
00461 self._abort()
00462 return
00463
00464 def _handle_websocket_headers(self):
00465 """Verifies all invariant- and required headers
00466
00467 If a header is missing or have an incorrect value ValueError will be
00468 raised
00469 """
00470 fields = ("Host", "Sec-Websocket-Key", "Sec-Websocket-Version")
00471 if not all(map(lambda f: self.request.headers.get(f), fields)):
00472 raise ValueError("Missing/Invalid WebSocket headers")
00473
00474 def _challenge_response(self):
00475 sha1 = hashlib.sha1()
00476 sha1.update(tornado.escape.utf8(
00477 self.request.headers.get("Sec-Websocket-Key")))
00478 sha1.update(b("258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))
00479 return tornado.escape.native_str(base64.b64encode(sha1.digest()))
00480
00481 def _accept_connection(self):
00482 subprotocol_header = ''
00483 subprotocols = self.request.headers.get("Sec-WebSocket-Protocol", '')
00484 subprotocols = [s.strip() for s in subprotocols.split(',')]
00485 if subprotocols:
00486 selected = self.handler.select_subprotocol(subprotocols)
00487 if selected:
00488 assert selected in subprotocols
00489 subprotocol_header = "Sec-WebSocket-Protocol: %s\r\n" % selected
00490
00491 self.stream.write(tornado.escape.utf8(
00492 "HTTP/1.1 101 Switching Protocols\r\n"
00493 "Upgrade: websocket\r\n"
00494 "Connection: Upgrade\r\n"
00495 "Sec-WebSocket-Accept: %s\r\n"
00496 "%s"
00497 "\r\n" % (self._challenge_response(), subprotocol_header)))
00498
00499 self.async_callback(self.handler.open)(*self.handler.open_args, **self.handler.open_kwargs)
00500 self._receive_frame()
00501
00502 def _write_frame(self, fin, opcode, data):
00503 if fin:
00504 finbit = 0x80
00505 else:
00506 finbit = 0
00507 frame = struct.pack("B", finbit | opcode)
00508 l = len(data)
00509 if l < 126:
00510 frame += struct.pack("B", l)
00511 elif l <= 0xFFFF:
00512 frame += struct.pack("!BH", 126, l)
00513 else:
00514 frame += struct.pack("!BQ", 127, l)
00515 frame += data
00516 self.stream.write(frame)
00517
00518 def write_message(self, message, binary=False):
00519 """Sends the given message to the client of this Web Socket."""
00520 if binary:
00521 opcode = 0x2
00522 else:
00523 opcode = 0x1
00524 message = tornado.escape.utf8(message)
00525 assert isinstance(message, bytes_type)
00526 self._write_frame(True, opcode, message)
00527
00528 def _receive_frame(self):
00529 self.stream.read_bytes(2, self._on_frame_start)
00530
00531 def _on_frame_start(self, data):
00532 header, payloadlen = struct.unpack("BB", data)
00533 self._final_frame = header & 0x80
00534 reserved_bits = header & 0x70
00535 self._frame_opcode = header & 0xf
00536 self._frame_opcode_is_control = self._frame_opcode & 0x8
00537 if reserved_bits:
00538
00539 self._abort()
00540 return
00541 if not (payloadlen & 0x80):
00542
00543 self._abort()
00544 return
00545 payloadlen = payloadlen & 0x7f
00546 if self._frame_opcode_is_control and payloadlen >= 126:
00547
00548 self._abort()
00549 return
00550 if payloadlen < 126:
00551 self._frame_length = payloadlen
00552 self.stream.read_bytes(4, self._on_masking_key)
00553 elif payloadlen == 126:
00554 self.stream.read_bytes(2, self._on_frame_length_16)
00555 elif payloadlen == 127:
00556 self.stream.read_bytes(8, self._on_frame_length_64)
00557
00558 def _on_frame_length_16(self, data):
00559 self._frame_length = struct.unpack("!H", data)[0]
00560 self.stream.read_bytes(4, self._on_masking_key)
00561
00562 def _on_frame_length_64(self, data):
00563 self._frame_length = struct.unpack("!Q", data)[0]
00564 self.stream.read_bytes(4, self._on_masking_key)
00565
00566 def _on_masking_key(self, data):
00567 self._frame_mask = array.array("B", data)
00568 self.stream.read_bytes(self._frame_length, self._on_frame_data)
00569
00570 def _on_frame_data(self, data):
00571 unmasked = array.array("B", data)
00572 for i in xrange(len(data)):
00573 unmasked[i] = unmasked[i] ^ self._frame_mask[i % 4]
00574
00575 if self._frame_opcode_is_control:
00576
00577
00578
00579 if not self._final_frame:
00580
00581 self._abort()
00582 return
00583 opcode = self._frame_opcode
00584 elif self._frame_opcode == 0:
00585 if self._fragmented_message_buffer is None:
00586
00587 self._abort()
00588 return
00589 self._fragmented_message_buffer += unmasked
00590 if self._final_frame:
00591 opcode = self._fragmented_message_opcode
00592 unmasked = self._fragmented_message_buffer
00593 self._fragmented_message_buffer = None
00594 else:
00595 if self._fragmented_message_buffer is not None:
00596
00597 self._abort()
00598 return
00599 if self._final_frame:
00600 opcode = self._frame_opcode
00601 else:
00602 self._fragmented_message_opcode = self._frame_opcode
00603 self._fragmented_message_buffer = unmasked
00604
00605 if self._final_frame:
00606 self._handle_message(opcode, unmasked.tostring())
00607
00608 if not self.client_terminated:
00609 self._receive_frame()
00610
00611 def _handle_message(self, opcode, data):
00612 if self.client_terminated:
00613 return
00614
00615 if opcode == 0x1:
00616
00617 try:
00618 decoded = data.decode("utf-8")
00619 except UnicodeDecodeError:
00620 self._abort()
00621 return
00622 self.async_callback(self.handler.on_message)(decoded)
00623 elif opcode == 0x2:
00624
00625 self.async_callback(self.handler.on_message)(data)
00626 elif opcode == 0x8:
00627
00628 self.client_terminated = True
00629 self.close()
00630 elif opcode == 0x9:
00631
00632 self._write_frame(True, 0xA, data)
00633 elif opcode == 0xA:
00634
00635 pass
00636 else:
00637 self._abort()
00638
00639 def close(self):
00640 """Closes the WebSocket connection."""
00641 if not self.server_terminated:
00642 if not self.stream.closed():
00643 self._write_frame(True, 0x8, b(""))
00644 self.server_terminated = True
00645 if self.client_terminated:
00646 if self._waiting is not None:
00647 self.stream.io_loop.remove_timeout(self._waiting)
00648 self._waiting = None
00649 self.stream.close()
00650 elif self._waiting is None:
00651
00652
00653 self._waiting = self.stream.io_loop.add_timeout(
00654 time.time() + 5, self._abort)