34 PYTHON2 = sys.version_info < (3, 0)
 
   37 from threading 
import Lock
 
   38 from functools 
import partial
 
   45     from ujson 
import dumps 
as encode_json
 
   48         from simplejson 
import dumps 
as encode_json
 
   50         from json 
import dumps 
as encode_json
 
   56     """ Keeps track of the clients multiple calls to subscribe. 
   58     Chooses the most appropriate settings to send messages """ 
   61         """ Create a subscription for the specified client on the specified 
   62         topic, with callback publish 
   65         client_id -- the ID of the client making this subscription 
   66         topic     -- the name of the topic to subscribe to 
   67         publish   -- the callback function for incoming messages 
   81         """ Unsubscribes this subscription and cleans up resources """ 
   84             self.
handler.finish(block=
False)
 
   87     def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
 
   88                   queue_length=0, fragment_size=None, compression="none"):
 
   89         """ Add another client's subscription request 
   91         If there are multiple calls to subscribe, the values actually used for 
   92         queue_length, fragment_size, compression and throttle_rate are 
   93         chosen to encompass all subscriptions' requirements 
   96         sid             -- the subscription id from the client 
   97         msg_type        -- the type of the message to subscribe to 
   98         throttle_rate   -- the minimum time (in ms) allowed between messages 
   99         being sent.  If multiple subscriptions, the lower of these is used 
  100         queue_length    -- the number of messages that can be buffered.  If 
  101         multiple subscriptions, the lower of these is used 
  102         fragment_size   -- None if no fragmentation, or the maximum length of 
  103         allowed outgoing messages 
  104         compression     -- "none" if no compression, or some other value if 
  105         compression is to be used (current valid values are 'png') 
  110             "throttle_rate": throttle_rate,
 
  111             "queue_length": queue_length,
 
  112             "fragment_size": fragment_size,
 
  113             "compression": compression
 
  116         self.
clients[sid] = client_details
 
  120         if compression == 
"cbor-raw":
 
  121             msg_type = 
"__AnyMsg" 
  127         """ Unsubscribe this particular client's subscription 
  130         sid -- the individual subscription id.  If None, all are unsubscribed 
  142         """ Return true if there are no subscriptions currently """ 
  146         """ Internal method to propagate published messages to the registered 
  151         """ Raw callback called by subscription manager for all incoming 
  154         Incoming messages are passed to the message handler which may drop, 
  155         buffer, or propagate the message 
  159             self.
handler.handle_message(msg)
 
  162         """ Determine the 'lowest common denominator' params to satisfy all 
  163         subscribed clients. """ 
  172             return [x[fieldname] 
for x 
in self.
clients.values()]
 
  176         frags = [x 
for x 
in f(
"fragment_size") 
if x != 
None]
 
  183         if "png" in f(
"compression"):
 
  185         if "cbor" in f(
"compression"):
 
  187         if "cbor-raw" in f(
"compression"):
 
  197     subscribe_msg_fields = [(
True, 
"topic", string_types), (
False, 
"type", string_types),
 
  198                             (
False, 
"throttle_rate", int), (
False, 
"fragment_size", int),
 
  199                             (
False, 
"queue_length", int), (
False, 
"compression", string_types)]
 
  200     unsubscribe_msg_fields = [(
True, 
"topic", string_types)]
 
  206         Capability.__init__(self, protocol)
 
  209         protocol.register_operation(
"subscribe", self.
subscribe)
 
  210         protocol.register_operation(
"unsubscribe", self.
unsubscribe)
 
  216         sid = msg.get(
"id", 
None)
 
  224         if Subscribe.topics_glob 
is not None and Subscribe.topics_glob:
 
  225             self.
protocol.log(
"debug", 
"Topic security glob enabled, checking topic: " + topic)
 
  227             for glob 
in Subscribe.topics_glob:
 
  228                 if (fnmatch.fnmatch(topic, glob)):
 
  229                     self.
protocol.log(
"debug", 
"Found match with glob " + glob + 
", continuing subscription...")
 
  233                 self.
protocol.log(
"warn", 
"No match found for topic, cancelling subscription to: " + topic)
 
  236             self.
protocol.log(
"debug", 
"No topic security glob, not checking subscription.")
 
  240             cb = partial(self.
publish, topic)
 
  246           "msg_type": msg.get(
"type", 
None),
 
  247           "throttle_rate": msg.get(
"throttle_rate", 0),
 
  248           "fragment_size": msg.get(
"fragment_size", 
None),
 
  249           "queue_length": msg.get(
"queue_length", 0),
 
  250           "compression": msg.get(
"compression", 
"none")
 
  254         self.
protocol.log(
"info", 
"Subscribed to %s" % topic)
 
  258         sid = msg.get(
"id", 
None)
 
  272         self.
protocol.log(
"info", 
"Unsubscribed from %s" % topic)
 
  274     def publish(self, topic, message, fragment_size=None, compression="none"):
 
  275         """ Publish a message to the client 
  278         topic   -- the topic to publish the message on 
  279         message -- a ROS message wrapped by OutgoingMessage 
  280         fragment_size -- (optional) fragment the serialized message into msgs 
  281         with payloads not greater than this value 
  282         compression   -- (optional) compress the message. valid values are 
  288         outgoing_msg = {
u"op": 
u"publish", 
u"topic": topic}
 
  289         if compression==
"png":
 
  290             outgoing_msg[
"msg"] = message.get_json_values()
 
  291             outgoing_msg_dumped = encode_json(outgoing_msg)
 
  292             outgoing_msg = {
"op": 
"png", 
"data": encode_png(outgoing_msg_dumped)}
 
  293         elif compression==
"cbor":
 
  294             outgoing_msg = message.get_cbor(outgoing_msg)
 
  295         elif compression==
"cbor-raw":
 
  296             outgoing_msg = message.get_cbor_raw(outgoing_msg)
 
  298             outgoing_msg[
"msg"] = message.get_json_values()
 
  300         self.
protocol.send(outgoing_msg, compression=compression)
 
  304             subscription.unregister()
 
  306         self.
protocol.unregister_operation(
"subscribe")
 
  307         self.
protocol.unregister_operation(
"unsubscribe")