subscribe.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2012, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 import fnmatch
00034 from threading import Lock
00035 from functools import partial
00036 from rospy import loginfo
00037 from rosbridge_library.capability import Capability
00038 from rosbridge_library.internal.subscribers import manager
00039 from rosbridge_library.internal.subscription_modifiers import MessageHandler
00040 from rosbridge_library.internal.pngcompression import encode
00041 
00042 try:
00043     from ujson import dumps
00044 except ImportError:
00045     try:
00046         from simplejson import dumps
00047     except ImportError:
00048         from json import dumps
00049 
00050 from rosbridge_library.util import string_types
00051 
00052 
00053 class Subscription():
00054     """ Keeps track of the clients multiple calls to subscribe.
00055 
00056     Chooses the most appropriate settings to send messages """
00057 
00058     def __init__(self, client_id, topic, publish):
00059         """ Create a subscription for the specified client on the specified
00060         topic, with callback publish
00061 
00062         Keyword arguments:
00063         client_id -- the ID of the client making this subscription
00064         topic     -- the name of the topic to subscribe to
00065         publish   -- the callback function for incoming messages
00066 
00067         """
00068         self.client_id = client_id
00069         self.topic = topic
00070         self.publish = publish
00071 
00072         self.clients = {}
00073 
00074         self.handler = MessageHandler(None, self._publish)
00075         self.handler_lock = Lock()
00076         self.update_params()
00077 
00078     def unregister(self):
00079         """ Unsubscribes this subscription and cleans up resources """
00080         manager.unsubscribe(self.client_id, self.topic)
00081         with self.handler_lock:
00082             self.handler.finish()
00083         self.clients.clear()
00084 
00085     def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
00086                   queue_length=0, fragment_size=None, compression="none"):
00087         """ Add another client's subscription request
00088 
00089         If there are multiple calls to subscribe, the values actually used for
00090         queue_length, fragment_size, compression and throttle_rate are
00091         chosen to encompass all subscriptions' requirements
00092 
00093         Keyword arguments:
00094         sid             -- the subscription id from the client
00095         msg_type        -- the type of the message to subscribe to
00096         throttle_rate   -- the minimum time (in ms) allowed between messages
00097         being sent.  If multiple subscriptions, the lower of these is used
00098         queue_length    -- the number of messages that can be buffered.  If
00099         multiple subscriptions, the lower of these is used
00100         fragment_size   -- None if no fragmentation, or the maximum length of
00101         allowed outgoing messages
00102         compression     -- "none" if no compression, or some other value if
00103         compression is to be used (current valid values are 'png')
00104 
00105          """
00106 
00107         client_details = {
00108             "throttle_rate": throttle_rate,
00109             "queue_length": queue_length,
00110             "fragment_size": fragment_size,
00111             "compression": compression
00112         }
00113 
00114         self.clients[sid] = client_details
00115 
00116         self.update_params()
00117 
00118         # Subscribe with the manager. This will propagate any exceptions
00119         manager.subscribe(self.client_id, self.topic, self.on_msg, msg_type)
00120 
00121     def unsubscribe(self, sid=None):
00122         """ Unsubscribe this particular client's subscription
00123 
00124         Keyword arguments:
00125         sid -- the individual subscription id.  If None, all are unsubscribed
00126 
00127         """
00128         if sid is None:
00129             self.clients.clear()
00130         elif sid in self.clients:
00131             del self.clients[sid]
00132 
00133         if not self.is_empty():
00134             self.update_params()
00135 
00136     def is_empty(self):
00137         """ Return true if there are no subscriptions currently """
00138         return len(self.clients) == 0
00139 
00140     def _publish(self, message):
00141         """ Internal method to propagate published messages to the registered
00142         publish callback """
00143         self.publish(message, self.fragment_size, self.compression)
00144 
00145     def on_msg(self, msg):
00146         """ Raw callback called by subscription manager for all incoming
00147         messages.
00148 
00149         Incoming messages are passed to the message handler which may drop,
00150         buffer, or propagate the message
00151 
00152         """
00153         with self.handler_lock:
00154             self.handler.handle_message(msg)
00155 
00156     def update_params(self):
00157         """ Determine the 'lowest common denominator' params to satisfy all
00158         subscribed clients. """
00159         if len(self.clients) == 0:
00160             self.throttle_rate = 0
00161             self.queue_length = 0
00162             self.fragment_size = None
00163             self.compression = "none"
00164             return
00165 
00166         def f(fieldname):
00167             return [x[fieldname] for x in self.clients.values()]
00168 
00169         self.throttle_rate = min(f("throttle_rate"))
00170         self.queue_length = min(f("queue_length"))
00171         frags = [x for x in f("fragment_size") if x != None]
00172         if frags == []:
00173             self.fragment_size = None
00174         else:
00175             self.fragment_size = min(frags)
00176         self.compression = "png" if "png" in f("compression") else "none"
00177 
00178         with self.handler_lock:
00179             self.handler = self.handler.set_throttle_rate(self.throttle_rate)
00180             self.handler = self.handler.set_queue_length(self.queue_length)
00181 
00182 
00183 class Subscribe(Capability):
00184 
00185     subscribe_msg_fields = [(True, "topic", string_types), (False, "type", string_types),
00186                             (False, "throttle_rate", int), (False, "fragment_size", int),
00187                             (False, "queue_length", int), (False, "compression", string_types)]
00188     unsubscribe_msg_fields = [(True, "topic", string_types)]
00189 
00190     topics_glob = None
00191 
00192     def __init__(self, protocol):
00193         # Call superclass constructor
00194         Capability.__init__(self, protocol)
00195 
00196         # Register the operations that this capability provides
00197         protocol.register_operation("subscribe", self.subscribe)
00198         protocol.register_operation("unsubscribe", self.unsubscribe)
00199 
00200         self._subscriptions = {}
00201 
00202     def subscribe(self, msg):
00203         # Pull out the ID
00204         sid = msg.get("id", None)
00205 
00206         # Check the args
00207         self.basic_type_check(msg, self.subscribe_msg_fields)
00208 
00209         # Make the subscription
00210         topic = msg["topic"]
00211 
00212         if Subscribe.topics_glob is not None and Subscribe.topics_glob:
00213             self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
00214             match = False
00215             for glob in Subscribe.topics_glob:
00216                 if (fnmatch.fnmatch(topic, glob)):
00217                     self.protocol.log("debug", "Found match with glob " + glob + ", continuing subscription...")
00218                     match = True
00219                     break
00220             if not match:
00221                 self.protocol.log("warn", "No match found for topic, cancelling subscription to: " + topic)
00222                 return
00223         else:
00224             self.protocol.log("debug", "No topic security glob, not checking subscription.")
00225 
00226         if not topic in self._subscriptions:
00227             client_id = self.protocol.client_id
00228             cb = partial(self.publish, topic)
00229             self._subscriptions[topic] = Subscription(client_id, topic, cb)
00230 
00231         # Register the subscriber
00232         subscribe_args = {
00233           "sid": sid,
00234           "msg_type": msg.get("type", None),
00235           "throttle_rate": msg.get("throttle_rate", 0),
00236           "fragment_size": msg.get("fragment_size", None),
00237           "queue_length": msg.get("queue_length", 0),
00238           "compression": msg.get("compression", "none")
00239         }
00240         self._subscriptions[topic].subscribe(**subscribe_args)
00241 
00242         self.protocol.log("info", "Subscribed to %s" % topic)
00243 
00244     def unsubscribe(self, msg):
00245         # Pull out the ID
00246         sid = msg.get("id", None)
00247 
00248         self.basic_type_check(msg, self.unsubscribe_msg_fields)
00249 
00250         topic = msg["topic"]
00251         if Subscribe.topics_glob is not None and Subscribe.topics_glob:
00252             self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
00253             match = False
00254             for glob in Subscribe.topics_glob:
00255                 if (fnmatch.fnmatch(topic, glob)):
00256                     self.protocol.log("debug", "Found match with glob " + glob + ", continuing unsubscription...")
00257                     match = True
00258                     break
00259             if not match:
00260                 self.protocol.log("warn", "No match found for topic, cancelling unsubscription from: " + topic)
00261                 return
00262         else:
00263             self.protocol.log("debug", "No topic security glob, not checking unsubscription.")
00264 
00265         if topic not in self._subscriptions:
00266             return
00267         self._subscriptions[topic].unsubscribe(sid)
00268 
00269         if self._subscriptions[topic].is_empty():
00270             self._subscriptions[topic].unregister()
00271             del self._subscriptions[topic]
00272 
00273         self.protocol.log("info", "Unsubscribed from %s" % topic)
00274 
00275     def publish(self, topic, message, fragment_size=None, compression="none"):
00276         """ Publish a message to the client
00277 
00278         Keyword arguments:
00279         topic   -- the topic to publish the message on
00280         message -- a dict of key-value pairs. Will be wrapped in a message with
00281         opcode publish
00282         fragment_size -- (optional) fragment the serialized message into msgs
00283         with payloads not greater than this value
00284         compression   -- (optional) compress the message. valid values are
00285         'png' and 'none'
00286 
00287         """
00288         # TODO: fragmentation, proper ids
00289         if Subscribe.topics_glob and Subscribe.topics_glob:
00290             self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
00291             match = False
00292             for glob in Subscribe.topics_glob:
00293                 if (fnmatch.fnmatch(topic, glob)):
00294                     self.protocol.log("debug", "Found match with glob " + glob + ", continuing topic publish...")
00295                     match = True
00296                     break
00297             if not match:
00298                 self.protocol.log("warn", "No match found for topic, cancelling topic publish to: " + topic)
00299                 return
00300         else:
00301             self.protocol.log("debug", "No topic security glob, not checking topic publish.")
00302 
00303         outgoing_msg = {"op": "publish", "topic": topic, "msg": message}
00304         if compression == "png":
00305             outgoing_msg_dumped = dumps(outgoing_msg)
00306             outgoing_msg = {"op": "png", "data": encode(outgoing_msg_dumped)}
00307         self.protocol.send(outgoing_msg)
00308 
00309     def finish(self):
00310         for subscription in self._subscriptions.values():
00311             subscription.unregister()
00312         self._subscriptions.clear()
00313         self.protocol.unregister_operation("subscribe")
00314         self.protocol.unregister_operation("unsubscribe")


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:43