autobahn_websocket.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 import rospy
34 import uuid
35 
36 from rosauth.srv import Authentication
37 
38 import sys
39 import threading
40 import traceback
41 from functools import wraps
42 from collections import deque
43 
44 from autobahn.twisted.websocket import WebSocketServerProtocol
45 from twisted.internet import interfaces, reactor
46 from zope.interface import implementer
47 
48 from rosbridge_library.rosbridge_protocol import RosbridgeProtocol
49 from rosbridge_library.util import json, bson
50 
51 
53  """Log the most recent exception to ROS."""
54  exc = traceback.format_exception(*sys.exc_info())
55  rospy.logerr(''.join(exc))
56 
57 
59  """Decorator for logging exceptions to ROS."""
60  @wraps(f)
61  def wrapper(*args, **kwargs):
62  try:
63  return f(*args, **kwargs)
64  except:
66  raise
67  return wrapper
68 
69 
70 class IncomingQueue(threading.Thread):
71  """Decouples incoming messages from the Autobahn thread.
72 
73  This mitigates cases where outgoing messages are blocked by incoming,
74  and vice versa.
75  """
76  def __init__(self, protocol):
77  threading.Thread.__init__(self)
78  self.daemon = True
79  self.queue = deque()
80  self.protocol = protocol
81 
82  self.cond = threading.Condition()
83  self._finished = False
84 
85  def finish(self):
86  """Clear the queue and do not accept further messages."""
87  with self.cond:
88  self._finished = True
89  while len(self.queue) > 0:
90  self.queue.popleft()
91  self.cond.notify()
92 
93  def push(self, msg):
94  with self.cond:
95  self.queue.append(msg)
96  self.cond.notify()
97 
98  def run(self):
99  while True:
100  with self.cond:
101  if len(self.queue) == 0 and not self._finished:
102  self.cond.wait()
103 
104  if self._finished:
105  break
106 
107  msg = self.queue.popleft()
108 
109  self.protocol.incoming(msg)
110 
111  self.protocol.finish()
112 
113 
114 @implementer(interfaces.IPushProducer)
116  """Allows the Autobahn transport to pause outgoing messages from rosbridge.
117 
118  The purpose of this valve is to connect backpressure from the WebSocket client
119  back to the rosbridge protocol, which depends on backpressure for queueing.
120  Without this flow control, rosbridge will happily keep writing messages to
121  the WebSocket until the system runs out of memory.
122 
123  This valve is closed and opened automatically by the Twisted TCP server.
124  In practice, Twisted should only close the valve when its userspace write buffer
125  is full and it should only open the valve when that buffer is empty.
126 
127  When the valve is closed, the rosbridge protocol instance's outgoing writes
128  must block until the valve is opened.
129  """
130  def __init__(self, proto):
131  self._proto = proto
132  self._valve = threading.Event()
133  self._finished = False
134 
135  @log_exceptions
136  def relay(self, message, compression="none"):
137  self._valve.wait()
138  if self._finished:
139  return
140  reactor.callFromThread(self._proto.outgoing, message, compression=compression)
141 
142  def pauseProducing(self):
143  if not self._finished:
144  self._valve.clear()
145 
146  def resumeProducing(self):
147  self._valve.set()
148 
149  def stopProducing(self):
150  self._finished = True
151  self._valve.set()
152 
153 
154 class RosbridgeWebSocket(WebSocketServerProtocol):
155  client_id_seed = 0
156  clients_connected = 0
157  authenticate = False
158 
159  # The following are passed on to RosbridgeProtocol
160  # defragmentation.py:
161  fragment_timeout = 600 # seconds
162  # protocol.py:
163  delay_between_messages = 0 # seconds
164  max_message_size = None # bytes
165  unregister_timeout = 10.0 # seconds
166  bson_only_mode = False
167 
168  def onOpen(self):
169  cls = self.__class__
170  parameters = {
171  "fragment_timeout": cls.fragment_timeout,
172  "delay_between_messages": cls.delay_between_messages,
173  "max_message_size": cls.max_message_size,
174  "unregister_timeout": cls.unregister_timeout,
175  "bson_only_mode": cls.bson_only_mode
176  }
177  try:
178  self.protocol = RosbridgeProtocol(cls.client_id_seed, parameters=parameters)
180  self.incoming_queue.start()
181  producer = OutgoingValve(self)
182  self.transport.registerProducer(producer, True)
183  producer.resumeProducing()
184  self.protocol.outgoing = producer.relay
185  self.authenticated = False
186  cls.client_id_seed += 1
187  cls.clients_connected += 1
188  self.client_id = uuid.uuid4()
189  self.peer = self.transport.getPeer().host
190  if cls.client_manager:
191  cls.client_manager.add_client(self.client_id, self.peer)
192 
193  except Exception as exc:
194  rospy.logerr("Unable to accept incoming connection. Reason: %s", str(exc))
195  rospy.loginfo("Client connected. %d clients total.", cls.clients_connected)
196  if cls.authenticate:
197  rospy.loginfo("Awaiting proper authentication...")
198 
199  def onMessage(self, message, binary):
200  cls = self.__class__
201  if not binary:
202  message = message.decode('utf-8')
203  # check if we need to authenticate
204  if cls.authenticate and not self.authenticated:
205  try:
206  if cls.bson_only_mode:
207  msg = bson.BSON(message).decode()
208  else:
209  msg = json.loads(message)
210 
211  if msg['op'] == 'auth':
212  # check the authorization information
213  auth_srv = rospy.ServiceProxy('authenticate', Authentication)
214  resp = auth_srv(msg['mac'], msg['client'], msg['dest'],
215  msg['rand'], rospy.Time(msg['t']), msg['level'],
216  rospy.Time(msg['end']))
217  self.authenticated = resp.authenticated
218  if self.authenticated:
219  rospy.loginfo("Client %d has authenticated.", self.protocol.client_id)
220  return
221  # if we are here, no valid authentication was given
222  rospy.logwarn("Client %d did not authenticate. Closing connection.",
223  self.protocol.client_id)
224  self.sendClose()
225  except:
226  # proper error will be handled in the protocol class
227  self.incoming_queue.push(message)
228  else:
229  # no authentication required
230  self.incoming_queue.push(message)
231 
232  def outgoing(self, message, compression="none"):
233  if type(message) == bson.BSON:
234  binary = True
235  message = bytes(message)
236  elif type(message) == bytearray:
237  binary = True
238  message = bytes(message)
239  elif compression in ["cbor", "cbor-raw"]:
240  binary = True
241  else:
242  binary = False
243  message = message.encode('utf-8')
244 
245  self.sendMessage(message, binary)
246 
247  def onClose(self, was_clean, code, reason):
248  if not hasattr(self, 'protocol'):
249  return # Closed before connection was opened.
250  cls = self.__class__
251  cls.clients_connected -= 1
252 
253  if cls.client_manager:
254  cls.client_manager.remove_client(self.client_id, self.peer)
255  rospy.loginfo("Client disconnected. %d clients total.", cls.clients_connected)
256 
257  self.incoming_queue.finish()
rosbridge_server.autobahn_websocket.log_exceptions
def log_exceptions(f)
Definition: autobahn_websocket.py:58
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.max_message_size
max_message_size
Definition: autobahn_websocket.py:164
rosbridge_server.autobahn_websocket.IncomingQueue
Definition: autobahn_websocket.py:70
rosbridge_server.autobahn_websocket.OutgoingValve._proto
_proto
Definition: autobahn_websocket.py:131
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.bson_only_mode
bool bson_only_mode
Definition: autobahn_websocket.py:166
rosbridge_server.autobahn_websocket.OutgoingValve.resumeProducing
def resumeProducing(self)
Definition: autobahn_websocket.py:146
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.fragment_timeout
int fragment_timeout
Definition: autobahn_websocket.py:161
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.onOpen
def onOpen(self)
Definition: autobahn_websocket.py:168
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.outgoing
def outgoing(self, message, compression="none")
Definition: autobahn_websocket.py:232
rosbridge_server.autobahn_websocket.IncomingQueue.__init__
def __init__(self, protocol)
Definition: autobahn_websocket.py:76
rosbridge_server.autobahn_websocket.OutgoingValve._valve
_valve
Definition: autobahn_websocket.py:132
rosbridge_server.autobahn_websocket.IncomingQueue._finished
_finished
Definition: autobahn_websocket.py:83
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.client_id_seed
int client_id_seed
Definition: autobahn_websocket.py:155
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.incoming_queue
incoming_queue
Definition: autobahn_websocket.py:179
rosbridge_server.autobahn_websocket.IncomingQueue.push
def push(self, msg)
Definition: autobahn_websocket.py:93
rosbridge_library::util
rosbridge_server.autobahn_websocket.IncomingQueue.finish
def finish(self)
Definition: autobahn_websocket.py:85
rosbridge_server.autobahn_websocket.OutgoingValve.relay
def relay(self, message, compression="none")
Definition: autobahn_websocket.py:136
rosbridge_server.autobahn_websocket.OutgoingValve
Definition: autobahn_websocket.py:115
rosbridge_server.autobahn_websocket.OutgoingValve.pauseProducing
def pauseProducing(self)
Definition: autobahn_websocket.py:142
rosbridge_server.autobahn_websocket._log_exception
def _log_exception()
Definition: autobahn_websocket.py:52
rosbridge_server.autobahn_websocket.IncomingQueue.run
def run(self)
Definition: autobahn_websocket.py:98
rosbridge_server.autobahn_websocket.IncomingQueue.daemon
daemon
Definition: autobahn_websocket.py:78
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.unregister_timeout
float unregister_timeout
Definition: autobahn_websocket.py:165
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.authenticate
bool authenticate
Definition: autobahn_websocket.py:157
rosbridge_server.autobahn_websocket.RosbridgeWebSocket
Definition: autobahn_websocket.py:154
rosbridge_server.autobahn_websocket.IncomingQueue.cond
cond
Definition: autobahn_websocket.py:82
rosbridge_server.autobahn_websocket.OutgoingValve._finished
_finished
Definition: autobahn_websocket.py:133
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.client_id
client_id
Definition: autobahn_websocket.py:188
rosbridge_server.autobahn_websocket.IncomingQueue.protocol
protocol
Definition: autobahn_websocket.py:80
rosbridge_library::rosbridge_protocol
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.delay_between_messages
int delay_between_messages
Definition: autobahn_websocket.py:163
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.clients_connected
int clients_connected
Definition: autobahn_websocket.py:156
rosbridge_server.autobahn_websocket.OutgoingValve.__init__
def __init__(self, proto)
Definition: autobahn_websocket.py:130
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.onClose
def onClose(self, was_clean, code, reason)
Definition: autobahn_websocket.py:247
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.onMessage
def onMessage(self, message, binary)
Definition: autobahn_websocket.py:199
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.peer
peer
Definition: autobahn_websocket.py:189
rosbridge_server.autobahn_websocket.OutgoingValve.stopProducing
def stopProducing(self)
Definition: autobahn_websocket.py:149
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.authenticated
authenticated
Definition: autobahn_websocket.py:185
rosbridge_library::rosbridge_protocol::RosbridgeProtocol
rosbridge_server.autobahn_websocket.IncomingQueue.queue
queue
Definition: autobahn_websocket.py:79
rosbridge_server.autobahn_websocket.RosbridgeWebSocket.protocol
protocol
Definition: autobahn_websocket.py:178


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Tue Oct 3 2023 02:12:49