autobahn_websocket.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 import rospy
34 import uuid
35 
36 from rosauth.srv import Authentication
37 
38 import sys
39 import threading
40 import traceback
41 from functools import wraps
42 from collections import deque
43 
44 from autobahn.twisted.websocket import WebSocketServerProtocol
45 
46 SendException = Exception
47 try:
48  # Newer autobahn versions (>= 20.1.3) raise an exception when sending to a closed socket
49  # PR: https://github.com/crossbario/autobahn-python/pull/1299
50  # Changelog: https://autobahn.readthedocs.io/en/latest/changelog.html
51  from autobahn.exception import Disconnected
52  SendException = Disconnected
53 except ImportError:
54  # Older versions fail silently, so create a fake exception we can pretend to catch
55  # so we can ignore the error.
56  class NeverException(Exception):
57  pass
58  SendException = NeverException
59 
60 from twisted.internet import interfaces, reactor
61 from zope.interface import implementer
62 
63 from rosbridge_library.rosbridge_protocol import RosbridgeProtocol
64 from rosbridge_library.util import json, bson
65 
66 
68  """Log the most recent exception to ROS."""
69  exc = traceback.format_exception(*sys.exc_info())
70  rospy.logerr(''.join(exc))
71 
72 
74  """Decorator for logging exceptions to ROS."""
75  @wraps(f)
76  def wrapper(*args, **kwargs):
77  try:
78  return f(*args, **kwargs)
79  except:
81  raise
82  return wrapper
83 
84 
85 class IncomingQueue(threading.Thread):
86  """Decouples incoming messages from the Autobahn thread.
87 
88  This mitigates cases where outgoing messages are blocked by incoming,
89  and vice versa.
90  """
91  def __init__(self, protocol):
92  threading.Thread.__init__(self)
93  self.daemon = True
94  self.queue = deque()
95  self.protocol = protocol
96 
97  self.cond = threading.Condition()
98  self._finished = False
99 
100  def finish(self):
101  """Clear the queue and do not accept further messages."""
102  with self.cond:
103  self._finished = True
104  while len(self.queue) > 0:
105  self.queue.popleft()
106  self.cond.notify()
107 
108  def push(self, msg):
109  with self.cond:
110  self.queue.append(msg)
111  self.cond.notify()
112 
113  def run(self):
114  while True:
115  with self.cond:
116  if len(self.queue) == 0 and not self._finished:
117  self.cond.wait()
118 
119  if self._finished:
120  break
121 
122  msg = self.queue.popleft()
123 
124  self.protocol.incoming(msg)
125 
126  self.protocol.finish()
127 
128 
129 @implementer(interfaces.IPushProducer)
131  """Allows the Autobahn transport to pause outgoing messages from rosbridge.
132 
133  The purpose of this valve is to connect backpressure from the WebSocket client
134  back to the rosbridge protocol, which depends on backpressure for queueing.
135  Without this flow control, rosbridge will happily keep writing messages to
136  the WebSocket until the system runs out of memory.
137 
138  This valve is closed and opened automatically by the Twisted TCP server.
139  In practice, Twisted should only close the valve when its userspace write buffer
140  is full and it should only open the valve when that buffer is empty.
141 
142  When the valve is closed, the rosbridge protocol instance's outgoing writes
143  must block until the valve is opened.
144  """
145  def __init__(self, proto):
146  self._proto = proto
147  self._valve = threading.Event()
148  self._finished = False
149 
150  @log_exceptions
151  def relay(self, message, compression="none"):
152  self._valve.wait()
153  if self._finished:
154  return
155  reactor.callFromThread(self._proto.outgoing, message, compression=compression)
156 
157  def pauseProducing(self):
158  if not self._finished:
159  self._valve.clear()
160 
161  def resumeProducing(self):
162  self._valve.set()
163 
164  def stopProducing(self):
165  self._finished = True
166  self._valve.set()
167 
168 
169 class RosbridgeWebSocket(WebSocketServerProtocol):
170  client_id_seed = 0
171  clients_connected = 0
172  authenticate = False
173 
174  # The following are passed on to RosbridgeProtocol
175  # defragmentation.py:
176  fragment_timeout = 600 # seconds
177  # protocol.py:
178  delay_between_messages = 0 # seconds
179  max_message_size = None # bytes
180  unregister_timeout = 10.0 # seconds
181  bson_only_mode = False
182 
183  def onOpen(self):
184  cls = self.__class__
185  parameters = {
186  "fragment_timeout": cls.fragment_timeout,
187  "delay_between_messages": cls.delay_between_messages,
188  "max_message_size": cls.max_message_size,
189  "unregister_timeout": cls.unregister_timeout,
190  "bson_only_mode": cls.bson_only_mode
191  }
192  try:
193  self.protocol = RosbridgeProtocol(cls.client_id_seed, parameters=parameters)
195  self.incoming_queue.start()
196  producer = OutgoingValve(self)
197  self.transport.registerProducer(producer, True)
198  producer.resumeProducing()
199  self.protocol.outgoing = producer.relay
200  self.authenticated = False
201  cls.client_id_seed += 1
202  cls.clients_connected += 1
203  self.client_id = uuid.uuid4()
204  self.peer = self.transport.getPeer().host
205  if cls.client_manager:
206  cls.client_manager.add_client(self.client_id, self.peer)
207 
208  except Exception as exc:
209  rospy.logerr("Unable to accept incoming connection. Reason: %s", str(exc))
210  rospy.loginfo("Client connected. %d clients total.", cls.clients_connected)
211  if cls.authenticate:
212  rospy.loginfo("Awaiting proper authentication...")
213 
214  def onMessage(self, message, binary):
215  cls = self.__class__
216  if not binary:
217  message = message.decode('utf-8')
218  # check if we need to authenticate
219  if cls.authenticate and not self.authenticated:
220  try:
221  if cls.bson_only_mode:
222  msg = bson.BSON(message).decode()
223  else:
224  msg = json.loads(message)
225 
226  if msg['op'] == 'auth':
227  # check the authorization information
228  auth_srv = rospy.ServiceProxy('authenticate', Authentication)
229  resp = auth_srv(msg['mac'], msg['client'], msg['dest'],
230  msg['rand'], rospy.Time(msg['t']), msg['level'],
231  rospy.Time(msg['end']))
232  self.authenticated = resp.authenticated
233  if self.authenticated:
234  rospy.loginfo("Client %d has authenticated.", self.protocol.client_id)
235  return
236  # if we are here, no valid authentication was given
237  rospy.logwarn("Client %d did not authenticate. Closing connection.",
238  self.protocol.client_id)
239  self.sendClose()
240  except:
241  # proper error will be handled in the protocol class
242  self.incoming_queue.push(message)
243  else:
244  # no authentication required
245  self.incoming_queue.push(message)
246 
247  def outgoing(self, message, compression="none"):
248  if type(message) == bson.BSON:
249  binary = True
250  message = bytes(message)
251  elif type(message) == bytearray:
252  binary = True
253  message = bytes(message)
254  elif compression in ["cbor", "cbor-raw"]:
255  binary = True
256  else:
257  binary = False
258  message = message.encode('utf-8')
259 
260  try:
261  self.sendMessage(message, binary)
262  except SendException as e:
263  rospy.loginfo(f"Tried to send message (beginning with '{message[:30]}') to disconnected channel. The connection should be cleaned up soon.")
264 
265  def onClose(self, was_clean, code, reason):
266  if not hasattr(self, 'protocol'):
267  return # Closed before connection was opened.
268  cls = self.__class__
269  cls.clients_connected -= 1
270 
271  if cls.client_manager:
272  cls.client_manager.remove_client(self.client_id, self.peer)
273  rospy.loginfo("Client disconnected. %d clients total.", cls.clients_connected)
274 
275  self.incoming_queue.finish()
rosbridge_server.autobahn_websocket.log_exceptions
def log_exceptions(f)
Definition: autobahn_websocket.py:73
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.max_message_size
max_message_size
Definition: autobahn_websocket.py:179
rosbridge_server.autobahn_websocket.IncomingQueue
Definition: autobahn_websocket.py:85
rosbridge_server.autobahn_websocket.OutgoingValve._proto
_proto
Definition: autobahn_websocket.py:146
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.bson_only_mode
bool bson_only_mode
Definition: autobahn_websocket.py:181
rosbridge_server.autobahn_websocket.OutgoingValve.resumeProducing
def resumeProducing(self)
Definition: autobahn_websocket.py:161
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.fragment_timeout
int fragment_timeout
Definition: autobahn_websocket.py:176
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.onOpen
def onOpen(self)
Definition: autobahn_websocket.py:183
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.outgoing
def outgoing(self, message, compression="none")
Definition: autobahn_websocket.py:247
rosbridge_server.autobahn_websocket.IncomingQueue.__init__
def __init__(self, protocol)
Definition: autobahn_websocket.py:91
rosbridge_server.autobahn_websocket.OutgoingValve._valve
_valve
Definition: autobahn_websocket.py:147
rosbridge_server.autobahn_websocket.IncomingQueue._finished
_finished
Definition: autobahn_websocket.py:98
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.client_id_seed
int client_id_seed
Definition: autobahn_websocket.py:170
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.incoming_queue
incoming_queue
Definition: autobahn_websocket.py:194
rosbridge_server.autobahn_websocket.IncomingQueue.push
def push(self, msg)
Definition: autobahn_websocket.py:108
rosbridge_library::util
rosbridge_server.autobahn_websocket.IncomingQueue.finish
def finish(self)
Definition: autobahn_websocket.py:100
rosbridge_server.autobahn_websocket.OutgoingValve.relay
def relay(self, message, compression="none")
Definition: autobahn_websocket.py:151
rosbridge_server.autobahn_websocket.OutgoingValve
Definition: autobahn_websocket.py:130
rosbridge_server.autobahn_websocket.OutgoingValve.pauseProducing
def pauseProducing(self)
Definition: autobahn_websocket.py:157
rosbridge_server.autobahn_websocket._log_exception
def _log_exception()
Definition: autobahn_websocket.py:67
rosbridge_server.autobahn_websocket.IncomingQueue.run
def run(self)
Definition: autobahn_websocket.py:113
rosbridge_server.autobahn_websocket.IncomingQueue.daemon
daemon
Definition: autobahn_websocket.py:93
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.unregister_timeout
float unregister_timeout
Definition: autobahn_websocket.py:180
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.authenticate
bool authenticate
Definition: autobahn_websocket.py:172
rosbridge_server.autobahn_websocket.RosbridgeWebSocket
Definition: autobahn_websocket.py:169
rosbridge_server.autobahn_websocket.NeverException
Definition: autobahn_websocket.py:56
rosbridge_server.autobahn_websocket.IncomingQueue.cond
cond
Definition: autobahn_websocket.py:97
rosbridge_server.autobahn_websocket.OutgoingValve._finished
_finished
Definition: autobahn_websocket.py:148
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.client_id
client_id
Definition: autobahn_websocket.py:203
rosbridge_server.autobahn_websocket.IncomingQueue.protocol
protocol
Definition: autobahn_websocket.py:95
rosbridge_library::rosbridge_protocol
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.delay_between_messages
int delay_between_messages
Definition: autobahn_websocket.py:178
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.clients_connected
int clients_connected
Definition: autobahn_websocket.py:171
rosbridge_server.autobahn_websocket.OutgoingValve.__init__
def __init__(self, proto)
Definition: autobahn_websocket.py:145
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.onClose
def onClose(self, was_clean, code, reason)
Definition: autobahn_websocket.py:265
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.onMessage
def onMessage(self, message, binary)
Definition: autobahn_websocket.py:214
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.peer
peer
Definition: autobahn_websocket.py:204
rosbridge_server.autobahn_websocket.OutgoingValve.stopProducing
def stopProducing(self)
Definition: autobahn_websocket.py:164
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.authenticated
authenticated
Definition: autobahn_websocket.py:200
rosbridge_library::rosbridge_protocol::RosbridgeProtocol
rosbridge_server.autobahn_websocket.IncomingQueue.queue
queue
Definition: autobahn_websocket.py:94
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.protocol
protocol
Definition: autobahn_websocket.py:193


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Wed Feb 12 2025 03:14:56