autobahn_websocket.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 import rospy
34 import uuid
35 
36 from rosauth.srv import Authentication
37 
38 import sys
39 import threading
40 import traceback
41 from functools import wraps
42 
43 from autobahn.twisted.websocket import WebSocketServerProtocol
44 from twisted.internet import interfaces, reactor
45 from zope.interface import implementer
46 
47 from rosbridge_library.rosbridge_protocol import RosbridgeProtocol
48 from rosbridge_library.util import json, bson
49 
50 
52  """Log the most recent exception to ROS."""
53  exc = traceback.format_exception(*sys.exc_info())
54  rospy.logerr(''.join(exc))
55 
56 
58  """Decorator for logging exceptions to ROS."""
59  @wraps(f)
60  def wrapper(*args, **kwargs):
61  try:
62  return f(*args, **kwargs)
63  except:
65  raise
66  return wrapper
67 
68 
69 @implementer(interfaces.IPushProducer)
71  """Allows the Autobahn transport to pause outgoing messages from rosbridge.
72 
73  The purpose of this valve is to connect backpressure from the WebSocket client
74  back to the rosbridge protocol, which depends on backpressure for queueing.
75  Without this flow control, rosbridge will happily keep writing messages to
76  the WebSocket until the system runs out of memory.
77 
78  This valve is closed and opened automatically by the Twisted TCP server.
79  In practice, Twisted should only close the valve when its userspace write buffer
80  is full and it should only open the valve when that buffer is empty.
81 
82  When the valve is closed, the rosbridge protocol instance's outgoing writes
83  must block until the valve is opened.
84  """
85  def __init__(self, proto):
86  self._proto = proto
87  self._valve = threading.Event()
88  self._finished = False
89 
90  @log_exceptions
91  def relay(self, message):
92  self._valve.wait()
93  if self._finished:
94  return
95  reactor.callFromThread(self._proto.outgoing, message)
96 
97  def pauseProducing(self):
98  if not self._finished:
99  self._valve.clear()
100 
101  def resumeProducing(self):
102  self._valve.set()
103 
104  def stopProducing(self):
105  self._finished = True
106  self._valve.set()
107 
108 
109 class RosbridgeWebSocket(WebSocketServerProtocol):
110  client_id_seed = 0
111  clients_connected = 0
112  authenticate = False
113 
114  # The following are passed on to RosbridgeProtocol
115  # defragmentation.py:
116  fragment_timeout = 600 # seconds
117  # protocol.py:
118  delay_between_messages = 0 # seconds
119  max_message_size = None # bytes
120  unregister_timeout = 10.0 # seconds
121  bson_only_mode = False
122 
123  def onOpen(self):
124  cls = self.__class__
125  parameters = {
126  "fragment_timeout": cls.fragment_timeout,
127  "delay_between_messages": cls.delay_between_messages,
128  "max_message_size": cls.max_message_size,
129  "unregister_timeout": cls.unregister_timeout,
130  "bson_only_mode": cls.bson_only_mode
131  }
132  try:
133  self.protocol = RosbridgeProtocol(cls.client_id_seed, parameters=parameters)
134  producer = OutgoingValve(self)
135  self.transport.registerProducer(producer, True)
136  producer.resumeProducing()
137  self.protocol.outgoing = producer.relay
138  self.authenticated = False
139  cls.client_id_seed += 1
140  cls.clients_connected += 1
141  self.client_id = uuid.uuid4()
142  self.peer = self.transport.getPeer().host
143  if cls.client_manager:
144  cls.client_manager.add_client(self.client_id, self.peer)
145 
146  except Exception as exc:
147  rospy.logerr("Unable to accept incoming connection. Reason: %s", str(exc))
148  rospy.loginfo("Client connected. %d clients total.", cls.clients_connected)
149  if cls.authenticate:
150  rospy.loginfo("Awaiting proper authentication...")
151 
152  def onMessage(self, message, binary):
153  cls = self.__class__
154  if not binary:
155  message = message.decode('utf-8')
156  # check if we need to authenticate
157  if cls.authenticate and not self.authenticated:
158  try:
159  if cls.bson_only_mode:
160  msg = bson.BSON(message).decode()
161  else:
162  msg = json.loads(message)
163 
164  if msg['op'] == 'auth':
165  # check the authorization information
166  auth_srv = rospy.ServiceProxy('authenticate', Authentication)
167  resp = auth_srv(msg['mac'], msg['client'], msg['dest'],
168  msg['rand'], rospy.Time(msg['t']), msg['level'],
169  rospy.Time(msg['end']))
170  self.authenticated = resp.authenticated
171  if self.authenticated:
172  rospy.loginfo("Client %d has authenticated.", self.protocol.client_id)
173  return
174  # if we are here, no valid authentication was given
175  rospy.logwarn("Client %d did not authenticate. Closing connection.",
176  self.protocol.client_id)
177  self.sendClose()
178  except:
179  # proper error will be handled in the protocol class
180  self.protocol.incoming(message)
181  else:
182  # no authentication required
183  self.protocol.incoming(message)
184 
185  def outgoing(self, message):
186  if type(message) == bson.BSON:
187  binary = True
188  message = bytes(message)
189  elif type(message) == bytearray:
190  binary = True
191  message = bytes(message)
192  else:
193  binary = False
194  message = message.encode('utf-8')
195 
196  self.sendMessage(message, binary)
197 
198  def onClose(self, was_clean, code, reason):
199  if not hasattr(self, 'protocol'):
200  return # Closed before connection was opened.
201  cls = self.__class__
202  cls.clients_connected -= 1
203  self.protocol.finish()
204  if cls.client_manager:
205  cls.client_manager.remove_client(self.client_id, self.peer)
206  rospy.loginfo("Client disconnected. %d clients total.", cls.clients_connected)


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:18