34 PYTHON2 = sys.version_info < (3, 0)
37 from threading
import Lock
38 from functools
import partial
45 from ujson
import dumps
as encode_json
48 from simplejson
import dumps
as encode_json
50 from json
import dumps
as encode_json
56 """ Keeps track of the clients multiple calls to subscribe.
58 Chooses the most appropriate settings to send messages """
61 """ Create a subscription for the specified client on the specified
62 topic, with callback publish
65 client_id -- the ID of the client making this subscription
66 topic -- the name of the topic to subscribe to
67 publish -- the callback function for incoming messages
81 """ Unsubscribes this subscription and cleans up resources """
84 self.
handler.finish(block=
False)
87 def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
88 queue_length=0, fragment_size=None, compression="none"):
89 """ Add another client's subscription request
91 If there are multiple calls to subscribe, the values actually used for
92 queue_length, fragment_size, compression and throttle_rate are
93 chosen to encompass all subscriptions' requirements
96 sid -- the subscription id from the client
97 msg_type -- the type of the message to subscribe to
98 throttle_rate -- the minimum time (in ms) allowed between messages
99 being sent. If multiple subscriptions, the lower of these is used
100 queue_length -- the number of messages that can be buffered. If
101 multiple subscriptions, the lower of these is used
102 fragment_size -- None if no fragmentation, or the maximum length of
103 allowed outgoing messages
104 compression -- "none" if no compression, or some other value if
105 compression is to be used (current valid values are 'png')
110 "throttle_rate": throttle_rate,
111 "queue_length": queue_length,
112 "fragment_size": fragment_size,
113 "compression": compression
116 self.
clients[sid] = client_details
120 if compression ==
"cbor-raw":
121 msg_type =
"__AnyMsg"
127 """ Unsubscribe this particular client's subscription
130 sid -- the individual subscription id. If None, all are unsubscribed
142 """ Return true if there are no subscriptions currently """
146 """ Internal method to propagate published messages to the registered
151 """ Raw callback called by subscription manager for all incoming
154 Incoming messages are passed to the message handler which may drop,
155 buffer, or propagate the message
159 self.
handler.handle_message(msg)
162 """ Determine the 'lowest common denominator' params to satisfy all
163 subscribed clients. """
172 return [x[fieldname]
for x
in self.
clients.values()]
176 frags = [x
for x
in f(
"fragment_size")
if x !=
None]
183 if "png" in f(
"compression"):
185 if "cbor" in f(
"compression"):
187 if "cbor-raw" in f(
"compression"):
197 subscribe_msg_fields = [(
True,
"topic", string_types), (
False,
"type", string_types),
198 (
False,
"throttle_rate", int), (
False,
"fragment_size", int),
199 (
False,
"queue_length", int), (
False,
"compression", string_types)]
200 unsubscribe_msg_fields = [(
True,
"topic", string_types)]
206 Capability.__init__(self, protocol)
209 protocol.register_operation(
"subscribe", self.
subscribe)
210 protocol.register_operation(
"unsubscribe", self.
unsubscribe)
216 sid = msg.get(
"id",
None)
224 if Subscribe.topics_glob
is not None and Subscribe.topics_glob:
225 self.
protocol.log(
"debug",
"Topic security glob enabled, checking topic: " + topic)
227 for glob
in Subscribe.topics_glob:
228 if (fnmatch.fnmatch(topic, glob)):
229 self.
protocol.log(
"debug",
"Found match with glob " + glob +
", continuing subscription...")
233 self.
protocol.log(
"warn",
"No match found for topic, cancelling subscription to: " + topic)
236 self.
protocol.log(
"debug",
"No topic security glob, not checking subscription.")
240 cb = partial(self.
publish, topic)
246 "msg_type": msg.get(
"type",
None),
247 "throttle_rate": msg.get(
"throttle_rate", 0),
248 "fragment_size": msg.get(
"fragment_size",
None),
249 "queue_length": msg.get(
"queue_length", 0),
250 "compression": msg.get(
"compression",
"none")
254 self.
protocol.log(
"info",
"Subscribed to %s" % topic)
258 sid = msg.get(
"id",
None)
272 self.
protocol.log(
"info",
"Unsubscribed from %s" % topic)
274 def publish(self, topic, message, fragment_size=None, compression="none"):
275 """ Publish a message to the client
278 topic -- the topic to publish the message on
279 message -- a ROS message wrapped by OutgoingMessage
280 fragment_size -- (optional) fragment the serialized message into msgs
281 with payloads not greater than this value
282 compression -- (optional) compress the message. valid values are
288 outgoing_msg = {
u"op":
u"publish",
u"topic": topic}
289 if compression==
"png":
290 outgoing_msg[
"msg"] = message.get_json_values()
291 outgoing_msg_dumped = encode_json(outgoing_msg)
292 outgoing_msg = {
"op":
"png",
"data": encode_png(outgoing_msg_dumped)}
293 elif compression==
"cbor":
294 outgoing_msg = message.get_cbor(outgoing_msg)
295 elif compression==
"cbor-raw":
296 outgoing_msg = message.get_cbor_raw(outgoing_msg)
298 outgoing_msg[
"msg"] = message.get_json_values()
300 self.
protocol.send(outgoing_msg, compression=compression)
304 subscription.unregister()
306 self.
protocol.unregister_operation(
"subscribe")
307 self.
protocol.unregister_operation(
"unsubscribe")