36 from rosauth.srv
import Authentication
41 from functools
import wraps
42 from collections
import deque
44 from autobahn.twisted.websocket
import WebSocketServerProtocol
46 SendException = Exception
51 from autobahn.exception
import Disconnected
52 SendException = Disconnected
58 SendException = NeverException
60 from twisted.internet
import interfaces, reactor
61 from zope.interface
import implementer
68 """Log the most recent exception to ROS."""
69 exc = traceback.format_exception(*sys.exc_info())
70 rospy.logerr(
''.join(exc))
74 """Decorator for logging exceptions to ROS."""
76 def wrapper(*args, **kwargs):
78 return f(*args, **kwargs)
86 """Decouples incoming messages from the Autobahn thread.
88 This mitigates cases where outgoing messages are blocked by incoming,
92 threading.Thread.__init__(self)
97 self.
cond = threading.Condition()
101 """Clear the queue and do not accept further messages."""
104 while len(self.
queue) > 0:
110 self.
queue.append(msg)
122 msg = self.
queue.popleft()
129 @implementer(interfaces.IPushProducer)
131 """Allows the Autobahn transport to pause outgoing messages from rosbridge.
133 The purpose of this valve is to connect backpressure from the WebSocket client
134 back to the rosbridge protocol, which depends on backpressure for queueing.
135 Without this flow control, rosbridge will happily keep writing messages to
136 the WebSocket until the system runs out of memory.
138 This valve is closed and opened automatically by the Twisted TCP server.
139 In practice, Twisted should only close the valve when its userspace write buffer
140 is full and it should only open the valve when that buffer is empty.
142 When the valve is closed, the rosbridge protocol instance's outgoing writes
143 must block until the valve is opened.
151 def relay(self, message, compression="none"):
155 reactor.callFromThread(self.
_proto.outgoing, message, compression=compression)
171 clients_connected = 0
176 fragment_timeout = 600
178 delay_between_messages = 0
179 max_message_size =
None
180 unregister_timeout = 10.0
181 bson_only_mode =
False
197 self.transport.registerProducer(producer,
True)
198 producer.resumeProducing()
199 self.
protocol.outgoing = producer.relay
204 self.
peer = self.transport.getPeer().host
205 if cls.client_manager:
208 except Exception
as exc:
209 rospy.logerr(
"Unable to accept incoming connection. Reason: %s", str(exc))
212 rospy.loginfo(
"Awaiting proper authentication...")
217 message = message.decode(
'utf-8')
222 msg = bson.BSON(message).decode()
224 msg = json.loads(message)
226 if msg[
'op'] ==
'auth':
228 auth_srv = rospy.ServiceProxy(
'authenticate', Authentication)
229 resp = auth_srv(msg[
'mac'], msg[
'client'], msg[
'dest'],
230 msg[
'rand'], rospy.Time(msg[
't']), msg[
'level'],
231 rospy.Time(msg[
'end']))
234 rospy.loginfo(
"Client %d has authenticated.", self.
protocol.client_id)
237 rospy.logwarn(
"Client %d did not authenticate. Closing connection.",
248 if type(message) == bson.BSON:
250 message = bytes(message)
251 elif type(message) == bytearray:
253 message = bytes(message)
254 elif compression
in [
"cbor",
"cbor-raw"]:
258 message = message.encode(
'utf-8')
261 self.sendMessage(message, binary)
262 except SendException
as e:
263 rospy.loginfo(f
"Tried to send message (beginning with '{message[:30]}') to disconnected channel. The connection should be cleaned up soon.")
266 if not hasattr(self,
'protocol'):
271 if cls.client_manager: