34 PYTHON2 = sys.version_info < (3, 0)
37 from threading
import Lock
38 from functools
import partial
45 from ujson
import dumps
as encode_json
48 from simplejson
import dumps
as encode_json
50 from json
import dumps
as encode_json
56 """ Keeps track of the clients multiple calls to subscribe. 58 Chooses the most appropriate settings to send messages """ 61 """ Create a subscription for the specified client on the specified 62 topic, with callback publish 65 client_id -- the ID of the client making this subscription 66 topic -- the name of the topic to subscribe to 67 publish -- the callback function for incoming messages 81 """ Unsubscribes this subscription and cleans up resources """ 84 self.
handler.finish(block=
False)
87 def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
88 queue_length=0, fragment_size=None, compression="none"):
89 """ Add another client's subscription request 91 If there are multiple calls to subscribe, the values actually used for 92 queue_length, fragment_size, compression and throttle_rate are 93 chosen to encompass all subscriptions' requirements 96 sid -- the subscription id from the client 97 msg_type -- the type of the message to subscribe to 98 throttle_rate -- the minimum time (in ms) allowed between messages 99 being sent. If multiple subscriptions, the lower of these is used 100 queue_length -- the number of messages that can be buffered. If 101 multiple subscriptions, the lower of these is used 102 fragment_size -- None if no fragmentation, or the maximum length of 103 allowed outgoing messages 104 compression -- "none" if no compression, or some other value if 105 compression is to be used (current valid values are 'png') 110 "throttle_rate": throttle_rate,
111 "queue_length": queue_length,
112 "fragment_size": fragment_size,
113 "compression": compression
116 self.
clients[sid] = client_details
120 if compression ==
"cbor-raw":
121 msg_type =
"__AnyMsg" 127 """ Unsubscribe this particular client's subscription 130 sid -- the individual subscription id. If None, all are unsubscribed 142 """ Return true if there are no subscriptions currently """ 146 """ Internal method to propagate published messages to the registered 151 """ Raw callback called by subscription manager for all incoming 154 Incoming messages are passed to the message handler which may drop, 155 buffer, or propagate the message 159 self.
handler.handle_message(msg)
162 """ Determine the 'lowest common denominator' params to satisfy all 163 subscribed clients. """ 172 return [x[fieldname]
for x
in self.
clients.values()]
176 frags = [x
for x
in f(
"fragment_size")
if x !=
None]
183 if "png" in f(
"compression"):
185 if "cbor" in f(
"compression"):
187 if "cbor-raw" in f(
"compression"):
197 subscribe_msg_fields = [(
True,
"topic", string_types), (
False,
"type", string_types),
198 (
False,
"throttle_rate", int), (
False,
"fragment_size", int),
199 (
False,
"queue_length", int), (
False,
"compression", string_types)]
200 unsubscribe_msg_fields = [(
True,
"topic", string_types)]
206 Capability.__init__(self, protocol)
209 protocol.register_operation(
"subscribe", self.
subscribe)
210 protocol.register_operation(
"unsubscribe", self.
unsubscribe)
216 sid = msg.get(
"id",
None)
224 if Subscribe.topics_glob
is not None and Subscribe.topics_glob:
225 self.
protocol.log(
"debug",
"Topic security glob enabled, checking topic: " + topic)
227 for glob
in Subscribe.topics_glob:
228 if (fnmatch.fnmatch(topic, glob)):
229 self.
protocol.log(
"debug",
"Found match with glob " + glob +
", continuing subscription...")
233 self.
protocol.log(
"warn",
"No match found for topic, cancelling subscription to: " + topic)
236 self.
protocol.log(
"debug",
"No topic security glob, not checking subscription.")
240 cb = partial(self.
publish, topic)
246 "msg_type": msg.get(
"type",
None),
247 "throttle_rate": msg.get(
"throttle_rate", 0),
248 "fragment_size": msg.get(
"fragment_size",
None),
249 "queue_length": msg.get(
"queue_length", 0),
250 "compression": msg.get(
"compression",
"none")
254 self.
protocol.log(
"info",
"Subscribed to %s" % topic)
258 sid = msg.get(
"id",
None)
272 self.
protocol.log(
"info",
"Unsubscribed from %s" % topic)
274 def publish(self, topic, message, fragment_size=None, compression="none"):
275 """ Publish a message to the client 278 topic -- the topic to publish the message on 279 message -- a ROS message wrapped by OutgoingMessage 280 fragment_size -- (optional) fragment the serialized message into msgs 281 with payloads not greater than this value 282 compression -- (optional) compress the message. valid values are 288 outgoing_msg = {
u"op":
u"publish",
u"topic": topic}
289 if compression==
"png":
290 outgoing_msg[
"msg"] = message.get_json_values()
291 outgoing_msg_dumped = encode_json(outgoing_msg)
292 outgoing_msg = {
"op":
"png",
"data": encode_png(outgoing_msg_dumped)}
293 elif compression==
"cbor":
294 outgoing_msg = message.get_cbor(outgoing_msg)
295 elif compression==
"cbor-raw":
296 outgoing_msg = message.get_cbor_raw(outgoing_msg)
298 outgoing_msg[
"msg"] = message.get_json_values()
300 self.
protocol.send(outgoing_msg, compression=compression)
304 subscription.unregister()
306 self.
protocol.unregister_operation(
"subscribe")
307 self.
protocol.unregister_operation(
"unsubscribe")
list unsubscribe_msg_fields
def __init__(self, client_id, topic, publish)
def subscribe(self, sid=None, msg_type=None, throttle_rate=0, queue_length=0, fragment_size=None, compression="none")
def _publish(self, message)
list subscribe_msg_fields
def unsubscribe(self, msg)
def __init__(self, protocol)
def publish(self, topic, message, fragment_size=None, compression="none")
def basic_type_check(self, msg, types_info)
def unsubscribe(self, sid=None)