34 PYTHON2 = sys.version_info < (3, 0)
37 from threading
import Lock
38 from functools
import partial
39 from rospy
import loginfo
46 from cbor
import dumps
as encode_cbor
51 from ujson
import dumps
as encode_json
54 from simplejson
import dumps
as encode_json
56 from json
import dumps
as encode_json
62 """ Keeps track of the clients multiple calls to subscribe. 64 Chooses the most appropriate settings to send messages """ 67 """ Create a subscription for the specified client on the specified 68 topic, with callback publish 71 client_id -- the ID of the client making this subscription 72 topic -- the name of the topic to subscribe to 73 publish -- the callback function for incoming messages 87 """ Unsubscribes this subscription and cleans up resources """ 93 def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
94 queue_length=0, fragment_size=
None, compression=
"none"):
95 """ Add another client's subscription request 97 If there are multiple calls to subscribe, the values actually used for 98 queue_length, fragment_size, compression and throttle_rate are 99 chosen to encompass all subscriptions' requirements 102 sid -- the subscription id from the client 103 msg_type -- the type of the message to subscribe to 104 throttle_rate -- the minimum time (in ms) allowed between messages 105 being sent. If multiple subscriptions, the lower of these is used 106 queue_length -- the number of messages that can be buffered. If 107 multiple subscriptions, the lower of these is used 108 fragment_size -- None if no fragmentation, or the maximum length of 109 allowed outgoing messages 110 compression -- "none" if no compression, or some other value if 111 compression is to be used (current valid values are 'png') 116 "throttle_rate": throttle_rate,
117 "queue_length": queue_length,
118 "fragment_size": fragment_size,
119 "compression": compression
122 self.
clients[sid] = client_details
130 """ Unsubscribe this particular client's subscription 133 sid -- the individual subscription id. If None, all are unsubscribed 145 """ Return true if there are no subscriptions currently """ 149 """ Internal method to propagate published messages to the registered 154 """ Raw callback called by subscription manager for all incoming 157 Incoming messages are passed to the message handler which may drop, 158 buffer, or propagate the message 162 self.handler.handle_message(msg)
165 """ Determine the 'lowest common denominator' params to satisfy all 166 subscribed clients. """ 175 return [x[fieldname]
for x
in self.clients.values()]
179 frags = [x
for x
in f(
"fragment_size")
if x !=
None]
186 if "png" in f(
"compression"):
188 if "cbor" in f(
"compression"):
198 subscribe_msg_fields = [(
True,
"topic", string_types), (
False,
"type", string_types),
199 (
False,
"throttle_rate", int), (
False,
"fragment_size", int),
200 (
False,
"queue_length", int), (
False,
"compression", string_types)]
201 unsubscribe_msg_fields = [(
True,
"topic", string_types)]
207 Capability.__init__(self, protocol)
210 protocol.register_operation(
"subscribe", self.
subscribe)
211 protocol.register_operation(
"unsubscribe", self.
unsubscribe)
217 sid = msg.get(
"id",
None)
225 if Subscribe.topics_glob
is not None and Subscribe.topics_glob:
226 self.protocol.log(
"debug",
"Topic security glob enabled, checking topic: " + topic)
228 for glob
in Subscribe.topics_glob:
229 if (fnmatch.fnmatch(topic, glob)):
230 self.protocol.log(
"debug",
"Found match with glob " + glob +
", continuing subscription...")
234 self.protocol.log(
"warn",
"No match found for topic, cancelling subscription to: " + topic)
237 self.protocol.log(
"debug",
"No topic security glob, not checking subscription.")
240 client_id = self.protocol.client_id
241 cb = partial(self.
publish, topic)
247 "msg_type": msg.get(
"type",
None),
248 "throttle_rate": msg.get(
"throttle_rate", 0),
249 "fragment_size": msg.get(
"fragment_size",
None),
250 "queue_length": msg.get(
"queue_length", 0),
251 "compression": msg.get(
"compression",
"none")
255 self.protocol.log(
"info",
"Subscribed to %s" % topic)
259 sid = msg.get(
"id",
None)
264 if Subscribe.topics_glob
is not None and Subscribe.topics_glob:
265 self.protocol.log(
"debug",
"Topic security glob enabled, checking topic: " + topic)
267 for glob
in Subscribe.topics_glob:
268 if (fnmatch.fnmatch(topic, glob)):
269 self.protocol.log(
"debug",
"Found match with glob " + glob +
", continuing unsubscription...")
273 self.protocol.log(
"warn",
"No match found for topic, cancelling unsubscription from: " + topic)
276 self.protocol.log(
"debug",
"No topic security glob, not checking unsubscription.")
286 self.protocol.log(
"info",
"Unsubscribed from %s" % topic)
288 def publish(self, topic, message, fragment_size=None, compression="none"):
289 """ Publish a message to the client 292 topic -- the topic to publish the message on 293 message -- a ROS message wrapped by OutgoingMessage 294 fragment_size -- (optional) fragment the serialized message into msgs 295 with payloads not greater than this value 296 compression -- (optional) compress the message. valid values are 301 if Subscribe.topics_glob
and Subscribe.topics_glob:
302 self.protocol.log(
"debug",
"Topic security glob enabled, checking topic: " + topic)
304 for glob
in Subscribe.topics_glob:
305 if (fnmatch.fnmatch(topic, glob)):
306 self.protocol.log(
"debug",
"Found match with glob " + glob +
", continuing topic publish...")
310 self.protocol.log(
"warn",
"No match found for topic, cancelling topic publish to: " + topic)
313 self.protocol.log(
"debug",
"No topic security glob, not checking topic publish.")
316 topic = unicode(topic)
318 outgoing_msg = {
u"op":
u"publish",
u"topic": topic}
319 if compression==
"png":
320 outgoing_msg[
"msg"] = message.get_json_values()
321 outgoing_msg_dumped = encode_json(outgoing_msg)
322 outgoing_msg = {
"op":
"png",
"data": encode_png(outgoing_msg_dumped)}
323 elif compression==
"cbor":
324 outgoing_msg[
u"msg"] = message.get_cbor_values()
325 outgoing_msg = bytearray(encode_cbor(outgoing_msg))
327 outgoing_msg[
"msg"] = message.get_json_values()
329 self.protocol.send(outgoing_msg)
332 for subscription
in self._subscriptions.values():
333 subscription.unregister()
334 self._subscriptions.clear()
335 self.protocol.unregister_operation(
"subscribe")
336 self.protocol.unregister_operation(
"unsubscribe")
list unsubscribe_msg_fields
def __init__(self, client_id, topic, publish)
def subscribe(self, sid=None, msg_type=None, throttle_rate=0, queue_length=0, fragment_size=None, compression="none")
def _publish(self, message)
list subscribe_msg_fields
def unsubscribe(self, msg)
def __init__(self, protocol)
def publish(self, topic, message, fragment_size=None, compression="none")
def basic_type_check(self, msg, types_info)
def unsubscribe(self, sid=None)