34 PYTHON2 = sys.version_info < (3, 0)
37 from threading
import Lock
38 from functools
import partial
39 from rospy
import loginfo, get_rostime
46 from cbor
import dumps
as encode_cbor
51 from ujson
import dumps
as encode_json
54 from simplejson
import dumps
as encode_json
56 from json
import dumps
as encode_json
62 """ Keeps track of the clients multiple calls to subscribe. 64 Chooses the most appropriate settings to send messages """ 67 """ Create a subscription for the specified client on the specified 68 topic, with callback publish 71 client_id -- the ID of the client making this subscription 72 topic -- the name of the topic to subscribe to 73 publish -- the callback function for incoming messages 87 """ Unsubscribes this subscription and cleans up resources """ 93 def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
94 queue_length=0, fragment_size=
None, compression=
"none"):
95 """ Add another client's subscription request 97 If there are multiple calls to subscribe, the values actually used for 98 queue_length, fragment_size, compression and throttle_rate are 99 chosen to encompass all subscriptions' requirements 102 sid -- the subscription id from the client 103 msg_type -- the type of the message to subscribe to 104 throttle_rate -- the minimum time (in ms) allowed between messages 105 being sent. If multiple subscriptions, the lower of these is used 106 queue_length -- the number of messages that can be buffered. If 107 multiple subscriptions, the lower of these is used 108 fragment_size -- None if no fragmentation, or the maximum length of 109 allowed outgoing messages 110 compression -- "none" if no compression, or some other value if 111 compression is to be used (current valid values are 'png') 116 "throttle_rate": throttle_rate,
117 "queue_length": queue_length,
118 "fragment_size": fragment_size,
119 "compression": compression
122 self.
clients[sid] = client_details
126 if compression ==
"cbor-raw":
127 msg_type =
"__AnyMsg" 133 """ Unsubscribe this particular client's subscription 136 sid -- the individual subscription id. If None, all are unsubscribed 148 """ Return true if there are no subscriptions currently """ 152 """ Internal method to propagate published messages to the registered 157 """ Raw callback called by subscription manager for all incoming 160 Incoming messages are passed to the message handler which may drop, 161 buffer, or propagate the message 165 self.handler.handle_message(msg)
168 """ Determine the 'lowest common denominator' params to satisfy all 169 subscribed clients. """ 178 return [x[fieldname]
for x
in self.clients.values()]
182 frags = [x
for x
in f(
"fragment_size")
if x !=
None]
189 if "png" in f(
"compression"):
191 if "cbor" in f(
"compression"):
193 if "cbor-raw" in f(
"compression"):
203 subscribe_msg_fields = [(
True,
"topic", string_types), (
False,
"type", string_types),
204 (
False,
"throttle_rate", int), (
False,
"fragment_size", int),
205 (
False,
"queue_length", int), (
False,
"compression", string_types)]
206 unsubscribe_msg_fields = [(
True,
"topic", string_types)]
212 Capability.__init__(self, protocol)
215 protocol.register_operation(
"subscribe", self.
subscribe)
216 protocol.register_operation(
"unsubscribe", self.
unsubscribe)
222 sid = msg.get(
"id",
None)
230 if Subscribe.topics_glob
is not None and Subscribe.topics_glob:
231 self.protocol.log(
"debug",
"Topic security glob enabled, checking topic: " + topic)
233 for glob
in Subscribe.topics_glob:
234 if (fnmatch.fnmatch(topic, glob)):
235 self.protocol.log(
"debug",
"Found match with glob " + glob +
", continuing subscription...")
239 self.protocol.log(
"warn",
"No match found for topic, cancelling subscription to: " + topic)
242 self.protocol.log(
"debug",
"No topic security glob, not checking subscription.")
245 client_id = self.protocol.client_id
246 cb = partial(self.
publish, topic)
252 "msg_type": msg.get(
"type",
None),
253 "throttle_rate": msg.get(
"throttle_rate", 0),
254 "fragment_size": msg.get(
"fragment_size",
None),
255 "queue_length": msg.get(
"queue_length", 0),
256 "compression": msg.get(
"compression",
"none")
260 self.protocol.log(
"info",
"Subscribed to %s" % topic)
264 sid = msg.get(
"id",
None)
269 if Subscribe.topics_glob
is not None and Subscribe.topics_glob:
270 self.protocol.log(
"debug",
"Topic security glob enabled, checking topic: " + topic)
272 for glob
in Subscribe.topics_glob:
273 if (fnmatch.fnmatch(topic, glob)):
274 self.protocol.log(
"debug",
"Found match with glob " + glob +
", continuing unsubscription...")
278 self.protocol.log(
"warn",
"No match found for topic, cancelling unsubscription from: " + topic)
281 self.protocol.log(
"debug",
"No topic security glob, not checking unsubscription.")
291 self.protocol.log(
"info",
"Unsubscribed from %s" % topic)
293 def publish(self, topic, message, fragment_size=None, compression="none"):
294 """ Publish a message to the client 297 topic -- the topic to publish the message on 298 message -- a ROS message wrapped by OutgoingMessage 299 fragment_size -- (optional) fragment the serialized message into msgs 300 with payloads not greater than this value 301 compression -- (optional) compress the message. valid values are 306 if Subscribe.topics_glob
and Subscribe.topics_glob:
307 self.protocol.log(
"debug",
"Topic security glob enabled, checking topic: " + topic)
309 for glob
in Subscribe.topics_glob:
310 if (fnmatch.fnmatch(topic, glob)):
311 self.protocol.log(
"debug",
"Found match with glob " + glob +
", continuing topic publish...")
315 self.protocol.log(
"warn",
"No match found for topic, cancelling topic publish to: " + topic)
318 self.protocol.log(
"debug",
"No topic security glob, not checking topic publish.")
321 topic = unicode(topic)
323 outgoing_msg = {
u"op":
u"publish",
u"topic": topic}
324 if compression==
"png":
325 outgoing_msg[
"msg"] = message.get_json_values()
326 outgoing_msg_dumped = encode_json(outgoing_msg)
327 outgoing_msg = {
"op":
"png",
"data": encode_png(outgoing_msg_dumped)}
328 elif compression==
"cbor":
329 outgoing_msg[
u"msg"] = message.get_cbor_values()
330 outgoing_msg = bytearray(encode_cbor(outgoing_msg))
331 elif compression==
"cbor-raw":
333 outgoing_msg[
u"msg"] = {
336 u"bytes": message._message._buff
338 outgoing_msg = bytearray(encode_cbor(outgoing_msg))
340 outgoing_msg[
"msg"] = message.get_json_values()
342 self.protocol.send(outgoing_msg)
345 for subscription
in self._subscriptions.values():
346 subscription.unregister()
347 self._subscriptions.clear()
348 self.protocol.unregister_operation(
"subscribe")
349 self.protocol.unregister_operation(
"unsubscribe")
list unsubscribe_msg_fields
def __init__(self, client_id, topic, publish)
def subscribe(self, sid=None, msg_type=None, throttle_rate=0, queue_length=0, fragment_size=None, compression="none")
def _publish(self, message)
list subscribe_msg_fields
def unsubscribe(self, msg)
def __init__(self, protocol)
def publish(self, topic, message, fragment_size=None, compression="none")
def basic_type_check(self, msg, types_info)
def unsubscribe(self, sid=None)