36 from rosauth.srv
import Authentication
41 from functools
import wraps
42 from collections
import deque
44 from autobahn.twisted.websocket
import WebSocketServerProtocol
45 from twisted.internet
import interfaces, reactor
46 from zope.interface
import implementer
53 """Log the most recent exception to ROS."""
54 exc = traceback.format_exception(*sys.exc_info())
55 rospy.logerr(
''.join(exc))
59 """Decorator for logging exceptions to ROS."""
61 def wrapper(*args, **kwargs):
63 return f(*args, **kwargs)
71 """Decouples incoming messages from the Autobahn thread.
73 This mitigates cases where outgoing messages are blocked by incoming,
77 threading.Thread.__init__(self)
82 self.
cond = threading.Condition()
86 """Clear the queue and do not accept further messages."""
89 while len(self.
queue) > 0:
95 self.
queue.append(msg)
107 msg = self.
queue.popleft()
114 @implementer(interfaces.IPushProducer)
116 """Allows the Autobahn transport to pause outgoing messages from rosbridge.
118 The purpose of this valve is to connect backpressure from the WebSocket client
119 back to the rosbridge protocol, which depends on backpressure for queueing.
120 Without this flow control, rosbridge will happily keep writing messages to
121 the WebSocket until the system runs out of memory.
123 This valve is closed and opened automatically by the Twisted TCP server.
124 In practice, Twisted should only close the valve when its userspace write buffer
125 is full and it should only open the valve when that buffer is empty.
127 When the valve is closed, the rosbridge protocol instance's outgoing writes
128 must block until the valve is opened.
136 def relay(self, message, compression="none"):
140 reactor.callFromThread(self.
_proto.outgoing, message, compression=compression)
156 clients_connected = 0
161 fragment_timeout = 600
163 delay_between_messages = 0
164 max_message_size =
None
165 unregister_timeout = 10.0
166 bson_only_mode =
False
182 self.transport.registerProducer(producer,
True)
183 producer.resumeProducing()
184 self.
protocol.outgoing = producer.relay
189 self.
peer = self.transport.getPeer().host
190 if cls.client_manager:
193 except Exception
as exc:
194 rospy.logerr(
"Unable to accept incoming connection. Reason: %s", str(exc))
197 rospy.loginfo(
"Awaiting proper authentication...")
202 message = message.decode(
'utf-8')
207 msg = bson.BSON(message).decode()
209 msg = json.loads(message)
211 if msg[
'op'] ==
'auth':
213 auth_srv = rospy.ServiceProxy(
'authenticate', Authentication)
214 resp = auth_srv(msg[
'mac'], msg[
'client'], msg[
'dest'],
215 msg[
'rand'], rospy.Time(msg[
't']), msg[
'level'],
216 rospy.Time(msg[
'end']))
219 rospy.loginfo(
"Client %d has authenticated.", self.
protocol.client_id)
222 rospy.logwarn(
"Client %d did not authenticate. Closing connection.",
233 if type(message) == bson.BSON:
235 message = bytes(message)
236 elif type(message) == bytearray:
238 message = bytes(message)
239 elif compression
in [
"cbor",
"cbor-raw"]:
243 message = message.encode(
'utf-8')
245 self.sendMessage(message, binary)
248 if not hasattr(self,
'protocol'):
253 if cls.client_manager: