websocket.py
Go to the documentation of this file.
00001 """Server-side implementation of the WebSocket protocol.
00002 
00003 `WebSockets <http://dev.w3.org/html5/websockets/>`_ allow for bidirectional
00004 communication between the browser and server.
00005 
00006 .. warning::
00007 
00008    The WebSocket protocol was recently finalized as `RFC 6455
00009    <http://tools.ietf.org/html/rfc6455>`_ and is not yet supported in
00010    all browsers.  Refer to http://caniuse.com/websockets for details
00011    on compatibility.  In addition, during development the protocol
00012    went through several incompatible versions, and some browsers only
00013    support older versions.  By default this module only supports the
00014    latest version of the protocol, but optional support for an older
00015    version (known as "draft 76" or "hixie-76") can be enabled by
00016    overriding `WebSocketHandler.allow_draft76` (see that method's
00017    documentation for caveats).
00018 """
00019 
00020 from __future__ import absolute_import, division, with_statement
00021 # Author: Jacob Kristhammar, 2010
00022 
00023 import array
00024 import functools
00025 import hashlib
00026 import logging
00027 import struct
00028 import time
00029 import base64
00030 import tornado.escape
00031 import tornado.web
00032 
00033 from tornado.util import bytes_type, b
00034 
00035 
00036 class WebSocketHandler(tornado.web.RequestHandler):
00037     """Subclass this class to create a basic WebSocket handler.
00038 
00039     Override on_message to handle incoming messages. You can also override
00040     open and on_close to handle opened and closed connections.
00041 
00042     See http://dev.w3.org/html5/websockets/ for details on the
00043     JavaScript interface.  The protocol is specified at
00044     http://tools.ietf.org/html/rfc6455.
00045 
00046     Here is an example Web Socket handler that echos back all received messages
00047     back to the client::
00048 
00049       class EchoWebSocket(websocket.WebSocketHandler):
00050           def open(self):
00051               print "WebSocket opened"
00052 
00053           def on_message(self, message):
00054               self.write_message(u"You said: " + message)
00055 
00056           def on_close(self):
00057               print "WebSocket closed"
00058 
00059     Web Sockets are not standard HTTP connections. The "handshake" is HTTP,
00060     but after the handshake, the protocol is message-based. Consequently,
00061     most of the Tornado HTTP facilities are not available in handlers of this
00062     type. The only communication methods available to you are write_message()
00063     and close(). Likewise, your request handler class should
00064     implement open() method rather than get() or post().
00065 
00066     If you map the handler above to "/websocket" in your application, you can
00067     invoke it in JavaScript with::
00068 
00069       var ws = new WebSocket("ws://localhost:8888/websocket");
00070       ws.onopen = function() {
00071          ws.send("Hello, world");
00072       };
00073       ws.onmessage = function (evt) {
00074          alert(evt.data);
00075       };
00076 
00077     This script pops up an alert box that says "You said: Hello, world".
00078     """
00079     def __init__(self, application, request, **kwargs):
00080         tornado.web.RequestHandler.__init__(self, application, request,
00081                                             **kwargs)
00082         self.stream = request.connection.stream
00083         self.ws_connection = None
00084 
00085     def _execute(self, transforms, *args, **kwargs):
00086         self.open_args = args
00087         self.open_kwargs = kwargs
00088 
00089         # Websocket only supports GET method
00090         if self.request.method != 'GET':
00091             self.stream.write(tornado.escape.utf8(
00092                 "HTTP/1.1 405 Method Not Allowed\r\n\r\n"
00093             ))
00094             self.stream.close()
00095             return
00096 
00097         # Upgrade header should be present and should be equal to WebSocket
00098         if self.request.headers.get("Upgrade", "").lower() != 'websocket':
00099             self.stream.write(tornado.escape.utf8(
00100                 "HTTP/1.1 400 Bad Request\r\n\r\n"
00101                 "Can \"Upgrade\" only to \"WebSocket\"."
00102             ))
00103             self.stream.close()
00104             return
00105 
00106         # Connection header should be upgrade. Some proxy servers/load balancers
00107         # might mess with it.
00108         headers = self.request.headers
00109         connection = map(lambda s: s.strip().lower(), headers.get("Connection", "").split(","))
00110         if 'upgrade' not in connection:
00111             self.stream.write(tornado.escape.utf8(
00112                 "HTTP/1.1 400 Bad Request\r\n\r\n"
00113                 "\"Connection\" must be \"Upgrade\"."
00114             ))
00115             self.stream.close()
00116             return
00117 
00118         # The difference between version 8 and 13 is that in 8 the
00119         # client sends a "Sec-Websocket-Origin" header and in 13 it's
00120         # simply "Origin".
00121         if self.request.headers.get("Sec-WebSocket-Version") in ("7", "8", "13"):
00122             self.ws_connection = WebSocketProtocol13(self)
00123             self.ws_connection.accept_connection()
00124         elif (self.allow_draft76() and
00125               "Sec-WebSocket-Version" not in self.request.headers):
00126             self.ws_connection = WebSocketProtocol76(self)
00127             self.ws_connection.accept_connection()
00128         else:
00129             self.stream.write(tornado.escape.utf8(
00130                 "HTTP/1.1 426 Upgrade Required\r\n"
00131                 "Sec-WebSocket-Version: 8\r\n\r\n"))
00132             self.stream.close()
00133 
00134     def write_message(self, message, binary=False):
00135         """Sends the given message to the client of this Web Socket.
00136 
00137         The message may be either a string or a dict (which will be
00138         encoded as json).  If the ``binary`` argument is false, the
00139         message will be sent as utf8; in binary mode any byte string
00140         is allowed.
00141         """
00142         if isinstance(message, dict):
00143             message = tornado.escape.json_encode(message)
00144         self.ws_connection.write_message(message, binary=binary)
00145 
00146     def select_subprotocol(self, subprotocols):
00147         """Invoked when a new WebSocket requests specific subprotocols.
00148 
00149         ``subprotocols`` is a list of strings identifying the
00150         subprotocols proposed by the client.  This method may be
00151         overridden to return one of those strings to select it, or
00152         ``None`` to not select a subprotocol.  Failure to select a
00153         subprotocol does not automatically abort the connection,
00154         although clients may close the connection if none of their
00155         proposed subprotocols was selected.
00156         """
00157         return None
00158 
00159     def open(self):
00160         """Invoked when a new WebSocket is opened.
00161 
00162         The arguments to `open` are extracted from the `tornado.web.URLSpec`
00163         regular expression, just like the arguments to
00164         `tornado.web.RequestHandler.get`.
00165         """
00166         pass
00167 
00168     def on_message(self, message):
00169         """Handle incoming messages on the WebSocket
00170 
00171         This method must be overridden.
00172         """
00173         raise NotImplementedError
00174 
00175     def on_close(self):
00176         """Invoked when the WebSocket is closed."""
00177         pass
00178 
00179     def close(self):
00180         """Closes this Web Socket.
00181 
00182         Once the close handshake is successful the socket will be closed.
00183         """
00184         self.ws_connection.close()
00185 
00186     def allow_draft76(self):
00187         """Override to enable support for the older "draft76" protocol.
00188 
00189         The draft76 version of the websocket protocol is disabled by
00190         default due to security concerns, but it can be enabled by
00191         overriding this method to return True.
00192 
00193         Connections using the draft76 protocol do not support the
00194         ``binary=True`` flag to `write_message`.
00195 
00196         Support for the draft76 protocol is deprecated and will be
00197         removed in a future version of Tornado.
00198         """
00199         return False
00200 
00201     def get_websocket_scheme(self):
00202         """Return the url scheme used for this request, either "ws" or "wss".
00203 
00204         This is normally decided by HTTPServer, but applications
00205         may wish to override this if they are using an SSL proxy
00206         that does not provide the X-Scheme header as understood
00207         by HTTPServer.
00208 
00209         Note that this is only used by the draft76 protocol.
00210         """
00211         return "wss" if self.request.protocol == "https" else "ws"
00212 
00213     def async_callback(self, callback, *args, **kwargs):
00214         """Wrap callbacks with this if they are used on asynchronous requests.
00215 
00216         Catches exceptions properly and closes this WebSocket if an exception
00217         is uncaught.  (Note that this is usually unnecessary thanks to
00218         `tornado.stack_context`)
00219         """
00220         return self.ws_connection.async_callback(callback, *args, **kwargs)
00221 
00222     def _not_supported(self, *args, **kwargs):
00223         raise Exception("Method not supported for Web Sockets")
00224 
00225     def on_connection_close(self):
00226         if self.ws_connection:
00227             self.ws_connection.on_connection_close()
00228             self.ws_connection = None
00229             self.on_close()
00230 
00231 
00232 for method in ["write", "redirect", "set_header", "send_error", "set_cookie",
00233                "set_status", "flush", "finish"]:
00234     setattr(WebSocketHandler, method, WebSocketHandler._not_supported)
00235 
00236 
00237 class WebSocketProtocol(object):
00238     """Base class for WebSocket protocol versions.
00239     """
00240     def __init__(self, handler):
00241         self.handler = handler
00242         self.request = handler.request
00243         self.stream = handler.stream
00244         self.client_terminated = False
00245         self.server_terminated = False
00246 
00247     def async_callback(self, callback, *args, **kwargs):
00248         """Wrap callbacks with this if they are used on asynchronous requests.
00249 
00250         Catches exceptions properly and closes this WebSocket if an exception
00251         is uncaught.
00252         """
00253         if args or kwargs:
00254             callback = functools.partial(callback, *args, **kwargs)
00255 
00256         def wrapper(*args, **kwargs):
00257             try:
00258                 return callback(*args, **kwargs)
00259             except Exception:
00260                 logging.error("Uncaught exception in %s",
00261                               self.request.path, exc_info=True)
00262                 self._abort()
00263         return wrapper
00264 
00265     def on_connection_close(self):
00266         self._abort()
00267 
00268     def _abort(self):
00269         """Instantly aborts the WebSocket connection by closing the socket"""
00270         self.client_terminated = True
00271         self.server_terminated = True
00272         self.stream.close()  # forcibly tear down the connection
00273         self.close()  # let the subclass cleanup
00274 
00275 
00276 class WebSocketProtocol76(WebSocketProtocol):
00277     """Implementation of the WebSockets protocol, version hixie-76.
00278 
00279     This class provides basic functionality to process WebSockets requests as
00280     specified in
00281     http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76
00282     """
00283     def __init__(self, handler):
00284         WebSocketProtocol.__init__(self, handler)
00285         self.challenge = None
00286         self._waiting = None
00287 
00288     def accept_connection(self):
00289         try:
00290             self._handle_websocket_headers()
00291         except ValueError:
00292             logging.debug("Malformed WebSocket request received")
00293             self._abort()
00294             return
00295 
00296         scheme = self.handler.get_websocket_scheme()
00297 
00298         # draft76 only allows a single subprotocol
00299         subprotocol_header = ''
00300         subprotocol = self.request.headers.get("Sec-WebSocket-Protocol", None)
00301         if subprotocol:
00302             selected = self.handler.select_subprotocol([subprotocol])
00303             if selected:
00304                 assert selected == subprotocol
00305                 subprotocol_header = "Sec-WebSocket-Protocol: %s\r\n" % selected
00306 
00307         # Write the initial headers before attempting to read the challenge.
00308         # This is necessary when using proxies (such as HAProxy), which
00309         # need to see the Upgrade headers before passing through the
00310         # non-HTTP traffic that follows.
00311         self.stream.write(tornado.escape.utf8(
00312             "HTTP/1.1 101 WebSocket Protocol Handshake\r\n"
00313             "Upgrade: WebSocket\r\n"
00314             "Connection: Upgrade\r\n"
00315             "Server: TornadoServer/%(version)s\r\n"
00316             "Sec-WebSocket-Origin: %(origin)s\r\n"
00317             "Sec-WebSocket-Location: %(scheme)s://%(host)s%(uri)s\r\n"
00318             "%(subprotocol)s"
00319             "\r\n" % (dict(
00320                     version=tornado.version,
00321                     origin=self.request.headers["Origin"],
00322                     scheme=scheme,
00323                     host=self.request.host,
00324                     uri=self.request.uri,
00325                     subprotocol=subprotocol_header))))
00326         self.stream.read_bytes(8, self._handle_challenge)
00327 
00328     def challenge_response(self, challenge):
00329         """Generates the challenge response that's needed in the handshake
00330 
00331         The challenge parameter should be the raw bytes as sent from the
00332         client.
00333         """
00334         key_1 = self.request.headers.get("Sec-Websocket-Key1")
00335         key_2 = self.request.headers.get("Sec-Websocket-Key2")
00336         try:
00337             part_1 = self._calculate_part(key_1)
00338             part_2 = self._calculate_part(key_2)
00339         except ValueError:
00340             raise ValueError("Invalid Keys/Challenge")
00341         return self._generate_challenge_response(part_1, part_2, challenge)
00342 
00343     def _handle_challenge(self, challenge):
00344         try:
00345             challenge_response = self.challenge_response(challenge)
00346         except ValueError:
00347             logging.debug("Malformed key data in WebSocket request")
00348             self._abort()
00349             return
00350         self._write_response(challenge_response)
00351 
00352     def _write_response(self, challenge):
00353         self.stream.write(challenge)
00354         self.async_callback(self.handler.open)(*self.handler.open_args, **self.handler.open_kwargs)
00355         self._receive_message()
00356 
00357     def _handle_websocket_headers(self):
00358         """Verifies all invariant- and required headers
00359 
00360         If a header is missing or have an incorrect value ValueError will be
00361         raised
00362         """
00363         fields = ("Origin", "Host", "Sec-Websocket-Key1",
00364                   "Sec-Websocket-Key2")
00365         if not all(map(lambda f: self.request.headers.get(f), fields)):
00366             raise ValueError("Missing/Invalid WebSocket headers")
00367 
00368     def _calculate_part(self, key):
00369         """Processes the key headers and calculates their key value.
00370 
00371         Raises ValueError when feed invalid key."""
00372         number = int(''.join(c for c in key if c.isdigit()))
00373         spaces = len([c for c in key if c.isspace()])
00374         try:
00375             key_number = number // spaces
00376         except (ValueError, ZeroDivisionError):
00377             raise ValueError
00378         return struct.pack(">I", key_number)
00379 
00380     def _generate_challenge_response(self, part_1, part_2, part_3):
00381         m = hashlib.md5()
00382         m.update(part_1)
00383         m.update(part_2)
00384         m.update(part_3)
00385         return m.digest()
00386 
00387     def _receive_message(self):
00388         self.stream.read_bytes(1, self._on_frame_type)
00389 
00390     def _on_frame_type(self, byte):
00391         frame_type = ord(byte)
00392         if frame_type == 0x00:
00393             self.stream.read_until(b("\xff"), self._on_end_delimiter)
00394         elif frame_type == 0xff:
00395             self.stream.read_bytes(1, self._on_length_indicator)
00396         else:
00397             self._abort()
00398 
00399     def _on_end_delimiter(self, frame):
00400         if not self.client_terminated:
00401             self.async_callback(self.handler.on_message)(
00402                     frame[:-1].decode("utf-8", "replace"))
00403         if not self.client_terminated:
00404             self._receive_message()
00405 
00406     def _on_length_indicator(self, byte):
00407         if ord(byte) != 0x00:
00408             self._abort()
00409             return
00410         self.client_terminated = True
00411         self.close()
00412 
00413     def write_message(self, message, binary=False):
00414         """Sends the given message to the client of this Web Socket."""
00415         if binary:
00416             raise ValueError(
00417                 "Binary messages not supported by this version of websockets")
00418         if isinstance(message, unicode):
00419             message = message.encode("utf-8")
00420         assert isinstance(message, bytes_type)
00421         self.stream.write(b("\x00") + message + b("\xff"))
00422 
00423     def close(self):
00424         """Closes the WebSocket connection."""
00425         if not self.server_terminated:
00426             if not self.stream.closed():
00427                 self.stream.write("\xff\x00")
00428             self.server_terminated = True
00429         if self.client_terminated:
00430             if self._waiting is not None:
00431                 self.stream.io_loop.remove_timeout(self._waiting)
00432             self._waiting = None
00433             self.stream.close()
00434         elif self._waiting is None:
00435             self._waiting = self.stream.io_loop.add_timeout(
00436                 time.time() + 5, self._abort)
00437 
00438 
00439 class WebSocketProtocol13(WebSocketProtocol):
00440     """Implementation of the WebSocket protocol from RFC 6455.
00441 
00442     This class supports versions 7 and 8 of the protocol in addition to the
00443     final version 13.
00444     """
00445     def __init__(self, handler):
00446         WebSocketProtocol.__init__(self, handler)
00447         self._final_frame = False
00448         self._frame_opcode = None
00449         self._frame_mask = None
00450         self._frame_length = None
00451         self._fragmented_message_buffer = None
00452         self._fragmented_message_opcode = None
00453         self._waiting = None
00454 
00455     def accept_connection(self):
00456         try:
00457             self._handle_websocket_headers()
00458             self._accept_connection()
00459         except ValueError:
00460             logging.debug("Malformed WebSocket request received")
00461             self._abort()
00462             return
00463 
00464     def _handle_websocket_headers(self):
00465         """Verifies all invariant- and required headers
00466 
00467         If a header is missing or have an incorrect value ValueError will be
00468         raised
00469         """
00470         fields = ("Host", "Sec-Websocket-Key", "Sec-Websocket-Version")
00471         if not all(map(lambda f: self.request.headers.get(f), fields)):
00472             raise ValueError("Missing/Invalid WebSocket headers")
00473 
00474     def _challenge_response(self):
00475         sha1 = hashlib.sha1()
00476         sha1.update(tornado.escape.utf8(
00477                 self.request.headers.get("Sec-Websocket-Key")))
00478         sha1.update(b("258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))  # Magic value
00479         return tornado.escape.native_str(base64.b64encode(sha1.digest()))
00480 
00481     def _accept_connection(self):
00482         subprotocol_header = ''
00483         subprotocols = self.request.headers.get("Sec-WebSocket-Protocol", '')
00484         subprotocols = [s.strip() for s in subprotocols.split(',')]
00485         if subprotocols:
00486             selected = self.handler.select_subprotocol(subprotocols)
00487             if selected:
00488                 assert selected in subprotocols
00489                 subprotocol_header = "Sec-WebSocket-Protocol: %s\r\n" % selected
00490 
00491         self.stream.write(tornado.escape.utf8(
00492             "HTTP/1.1 101 Switching Protocols\r\n"
00493             "Upgrade: websocket\r\n"
00494             "Connection: Upgrade\r\n"
00495             "Sec-WebSocket-Accept: %s\r\n"
00496             "%s"
00497             "\r\n" % (self._challenge_response(), subprotocol_header)))
00498 
00499         self.async_callback(self.handler.open)(*self.handler.open_args, **self.handler.open_kwargs)
00500         self._receive_frame()
00501 
00502     def _write_frame(self, fin, opcode, data):
00503         if fin:
00504             finbit = 0x80
00505         else:
00506             finbit = 0
00507         frame = struct.pack("B", finbit | opcode)
00508         l = len(data)
00509         if l < 126:
00510             frame += struct.pack("B", l)
00511         elif l <= 0xFFFF:
00512             frame += struct.pack("!BH", 126, l)
00513         else:
00514             frame += struct.pack("!BQ", 127, l)
00515         frame += data
00516         self.stream.write(frame)
00517 
00518     def write_message(self, message, binary=False):
00519         """Sends the given message to the client of this Web Socket."""
00520         if binary:
00521             opcode = 0x2
00522         else:
00523             opcode = 0x1
00524         message = tornado.escape.utf8(message)
00525         assert isinstance(message, bytes_type)
00526         self._write_frame(True, opcode, message)
00527 
00528     def _receive_frame(self):
00529         self.stream.read_bytes(2, self._on_frame_start)
00530 
00531     def _on_frame_start(self, data):
00532         header, payloadlen = struct.unpack("BB", data)
00533         self._final_frame = header & 0x80
00534         reserved_bits = header & 0x70
00535         self._frame_opcode = header & 0xf
00536         self._frame_opcode_is_control = self._frame_opcode & 0x8
00537         if reserved_bits:
00538             # client is using as-yet-undefined extensions; abort
00539             self._abort()
00540             return
00541         if not (payloadlen & 0x80):
00542             # Unmasked frame -> abort connection
00543             self._abort()
00544             return
00545         payloadlen = payloadlen & 0x7f
00546         if self._frame_opcode_is_control and payloadlen >= 126:
00547             # control frames must have payload < 126
00548             self._abort()
00549             return
00550         if payloadlen < 126:
00551             self._frame_length = payloadlen
00552             self.stream.read_bytes(4, self._on_masking_key)
00553         elif payloadlen == 126:
00554             self.stream.read_bytes(2, self._on_frame_length_16)
00555         elif payloadlen == 127:
00556             self.stream.read_bytes(8, self._on_frame_length_64)
00557 
00558     def _on_frame_length_16(self, data):
00559         self._frame_length = struct.unpack("!H", data)[0]
00560         self.stream.read_bytes(4, self._on_masking_key)
00561 
00562     def _on_frame_length_64(self, data):
00563         self._frame_length = struct.unpack("!Q", data)[0]
00564         self.stream.read_bytes(4, self._on_masking_key)
00565 
00566     def _on_masking_key(self, data):
00567         self._frame_mask = array.array("B", data)
00568         self.stream.read_bytes(self._frame_length, self._on_frame_data)
00569 
00570     def _on_frame_data(self, data):
00571         unmasked = array.array("B", data)
00572         for i in xrange(len(data)):
00573             unmasked[i] = unmasked[i] ^ self._frame_mask[i % 4]
00574 
00575         if self._frame_opcode_is_control:
00576             # control frames may be interleaved with a series of fragmented
00577             # data frames, so control frames must not interact with
00578             # self._fragmented_*
00579             if not self._final_frame:
00580                 # control frames must not be fragmented
00581                 self._abort()
00582                 return
00583             opcode = self._frame_opcode
00584         elif self._frame_opcode == 0:  # continuation frame
00585             if self._fragmented_message_buffer is None:
00586                 # nothing to continue
00587                 self._abort()
00588                 return
00589             self._fragmented_message_buffer += unmasked
00590             if self._final_frame:
00591                 opcode = self._fragmented_message_opcode
00592                 unmasked = self._fragmented_message_buffer
00593                 self._fragmented_message_buffer = None
00594         else:  # start of new data message
00595             if self._fragmented_message_buffer is not None:
00596                 # can't start new message until the old one is finished
00597                 self._abort()
00598                 return
00599             if self._final_frame:
00600                 opcode = self._frame_opcode
00601             else:
00602                 self._fragmented_message_opcode = self._frame_opcode
00603                 self._fragmented_message_buffer = unmasked
00604 
00605         if self._final_frame:
00606             self._handle_message(opcode, unmasked.tostring())
00607 
00608         if not self.client_terminated:
00609             self._receive_frame()
00610 
00611     def _handle_message(self, opcode, data):
00612         if self.client_terminated:
00613             return
00614 
00615         if opcode == 0x1:
00616             # UTF-8 data
00617             try:
00618                 decoded = data.decode("utf-8")
00619             except UnicodeDecodeError:
00620                 self._abort()
00621                 return
00622             self.async_callback(self.handler.on_message)(decoded)
00623         elif opcode == 0x2:
00624             # Binary data
00625             self.async_callback(self.handler.on_message)(data)
00626         elif opcode == 0x8:
00627             # Close
00628             self.client_terminated = True
00629             self.close()
00630         elif opcode == 0x9:
00631             # Ping
00632             self._write_frame(True, 0xA, data)
00633         elif opcode == 0xA:
00634             # Pong
00635             pass
00636         else:
00637             self._abort()
00638 
00639     def close(self):
00640         """Closes the WebSocket connection."""
00641         if not self.server_terminated:
00642             if not self.stream.closed():
00643                 self._write_frame(True, 0x8, b(""))
00644             self.server_terminated = True
00645         if self.client_terminated:
00646             if self._waiting is not None:
00647                 self.stream.io_loop.remove_timeout(self._waiting)
00648                 self._waiting = None
00649             self.stream.close()
00650         elif self._waiting is None:
00651             # Give the client a few seconds to complete a clean shutdown,
00652             # otherwise just close the connection.
00653             self._waiting = self.stream.io_loop.add_timeout(
00654                 time.time() + 5, self._abort)


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:53:55