subscribe.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2012, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 import fnmatch
00034 from threading import Lock
00035 from functools import partial
00036 from rospy import loginfo
00037 from rosbridge_library.capability import Capability
00038 from rosbridge_library.internal.subscribers import manager
00039 from rosbridge_library.internal.subscription_modifiers import MessageHandler
00040 from rosbridge_library.internal.pngcompression import encode
00041 
00042 try:
00043     from ujson import dumps
00044 except ImportError:
00045     try:
00046         from simplejson import dumps
00047     except ImportError:
00048         from json import dumps
00049 
00050 
00051 class Subscription():
00052     """ Keeps track of the clients multiple calls to subscribe.
00053 
00054     Chooses the most appropriate settings to send messages """
00055 
00056     def __init__(self, client_id, topic, publish):
00057         """ Create a subscription for the specified client on the specified
00058         topic, with callback publish
00059 
00060         Keyword arguments:
00061         client_id -- the ID of the client making this subscription
00062         topic     -- the name of the topic to subscribe to
00063         publish   -- the callback function for incoming messages
00064 
00065         """
00066         self.client_id = client_id
00067         self.topic = topic
00068         self.publish = publish
00069 
00070         self.clients = {}
00071 
00072         self.handler = MessageHandler(None, self._publish)
00073         self.handler_lock = Lock()
00074         self.update_params()
00075 
00076     def unregister(self):
00077         """ Unsubscribes this subscription and cleans up resources """
00078         manager.unsubscribe(self.client_id, self.topic)
00079         with self.handler_lock:
00080             self.handler.finish()
00081         self.clients.clear()
00082 
00083     def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
00084                   queue_length=0, fragment_size=None, compression="none"):
00085         """ Add another client's subscription request
00086 
00087         If there are multiple calls to subscribe, the values actually used for
00088         queue_length, fragment_size, compression and throttle_rate are
00089         chosen to encompass all subscriptions' requirements
00090 
00091         Keyword arguments:
00092         sid             -- the subscription id from the client
00093         msg_type        -- the type of the message to subscribe to
00094         throttle_rate   -- the minimum time (in ms) allowed between messages
00095         being sent.  If multiple subscriptions, the lower of these is used
00096         queue_length    -- the number of messages that can be buffered.  If
00097         multiple subscriptions, the lower of these is used
00098         fragment_size   -- None if no fragmentation, or the maximum length of
00099         allowed outgoing messages
00100         compression     -- "none" if no compression, or some other value if
00101         compression is to be used (current valid values are 'png')
00102 
00103          """
00104 
00105         client_details = {
00106             "throttle_rate": throttle_rate,
00107             "queue_length": queue_length,
00108             "fragment_size": fragment_size,
00109             "compression": compression
00110         }
00111 
00112         self.clients[sid] = client_details
00113 
00114         self.update_params()
00115 
00116         # Subscribe with the manager. This will propagate any exceptions
00117         manager.subscribe(self.client_id, self.topic, self.on_msg, msg_type)
00118 
00119     def unsubscribe(self, sid=None):
00120         """ Unsubscribe this particular client's subscription
00121 
00122         Keyword arguments:
00123         sid -- the individual subscription id.  If None, all are unsubscribed
00124 
00125         """
00126         if sid is None:
00127             self.clients.clear()
00128         elif sid in self.clients:
00129             del self.clients[sid]
00130 
00131         if not self.is_empty():
00132             self.update_params()
00133 
00134     def is_empty(self):
00135         """ Return true if there are no subscriptions currently """
00136         return len(self.clients) == 0
00137 
00138     def _publish(self, message):
00139         """ Internal method to propagate published messages to the registered
00140         publish callback """
00141         self.publish(message, self.fragment_size, self.compression)
00142 
00143     def on_msg(self, msg):
00144         """ Raw callback called by subscription manager for all incoming
00145         messages.
00146 
00147         Incoming messages are passed to the message handler which may drop,
00148         buffer, or propagate the message
00149 
00150         """
00151         with self.handler_lock:
00152             self.handler.handle_message(msg)
00153 
00154     def update_params(self):
00155         """ Determine the 'lowest common denominator' params to satisfy all
00156         subscribed clients. """
00157         if len(self.clients) == 0:
00158             self.throttle_rate = 0
00159             self.queue_length = 0
00160             self.fragment_size = None
00161             self.compression = "none"
00162             return
00163 
00164         def f(fieldname):
00165             return [x[fieldname] for x in self.clients.values()]
00166 
00167         self.throttle_rate = min(f("throttle_rate"))
00168         self.queue_length = min(f("queue_length"))
00169         frags = [x for x in f("fragment_size") if x != None]
00170         if frags == []:
00171             self.fragment_size = None
00172         else:
00173             self.fragment_size = min(frags)
00174         self.compression = "png" if "png" in f("compression") else "none"
00175 
00176         with self.handler_lock:
00177             self.handler = self.handler.set_throttle_rate(self.throttle_rate)
00178             self.handler = self.handler.set_queue_length(self.queue_length)
00179 
00180 
00181 class Subscribe(Capability):
00182 
00183     subscribe_msg_fields = [(True, "topic", (str, unicode)), (False, "type", (str, unicode)),
00184                             (False, "throttle_rate", int), (False, "fragment_size", int),
00185                             (False, "queue_length", int), (False, "compression", (str, unicode))]
00186     unsubscribe_msg_fields = [(True, "topic", (str, unicode))]
00187 
00188     topics_glob = None
00189 
00190     def __init__(self, protocol):
00191         # Call superclass constructor
00192         Capability.__init__(self, protocol)
00193 
00194         # Register the operations that this capability provides
00195         protocol.register_operation("subscribe", self.subscribe)
00196         protocol.register_operation("unsubscribe", self.unsubscribe)
00197 
00198         self._subscriptions = {}
00199 
00200     def subscribe(self, msg):
00201         # Pull out the ID
00202         sid = msg.get("id", None)
00203 
00204         # Check the args
00205         self.basic_type_check(msg, self.subscribe_msg_fields)
00206 
00207         # Make the subscription
00208         topic = msg["topic"]
00209 
00210         if Subscribe.topics_glob is not None and Subscribe.topics_glob:
00211             self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
00212             match = False
00213             for glob in Subscribe.topics_glob:
00214                 if (fnmatch.fnmatch(topic, glob)):
00215                     self.protocol.log("debug", "Found match with glob " + glob + ", continuing subscription...")
00216                     match = True
00217                     break
00218             if not match:
00219                 self.protocol.log("warn", "No match found for topic, cancelling subscription to: " + topic)
00220                 return
00221         else:
00222             self.protocol.log("debug", "No topic security glob, not checking subscription.")
00223 
00224         if not topic in self._subscriptions:
00225             client_id = self.protocol.client_id
00226             cb = partial(self.publish, topic)
00227             self._subscriptions[topic] = Subscription(client_id, topic, cb)
00228 
00229         # Register the subscriber
00230         subscribe_args = {
00231           "sid": sid,
00232           "msg_type": msg.get("type", None),
00233           "throttle_rate": msg.get("throttle_rate", 0),
00234           "fragment_size": msg.get("fragment_size", None),
00235           "queue_length": msg.get("queue_length", 0),
00236           "compression": msg.get("compression", "none")
00237         }
00238         self._subscriptions[topic].subscribe(**subscribe_args)
00239 
00240         self.protocol.log("info", "Subscribed to %s" % topic)
00241 
00242     def unsubscribe(self, msg):
00243         # Pull out the ID
00244         sid = msg.get("id", None)
00245  
00246         self.basic_type_check(msg, self.unsubscribe_msg_fields)
00247 
00248         topic = msg["topic"]
00249         if Subscribe.topics_glob is not None and Subscribe.topics_glob:
00250             self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
00251             match = False
00252             for glob in Subscribe.topics_glob:
00253                 if (fnmatch.fnmatch(topic, glob)):
00254                     self.protocol.log("debug", "Found match with glob " + glob + ", continuing unsubscription...")
00255                     match = True
00256                     break
00257             if not match:
00258                 self.protocol.log("warn", "No match found for topic, cancelling unsubscription from: " + topic)
00259                 return
00260         else:
00261             self.protocol.log("debug", "No topic security glob, not checking unsubscription.")
00262 
00263         if topic not in self._subscriptions:
00264             return
00265         self._subscriptions[topic].unsubscribe(sid)
00266 
00267         if self._subscriptions[topic].is_empty():
00268             self._subscriptions[topic].unregister()
00269             del self._subscriptions[topic]
00270 
00271         self.protocol.log("info", "Unsubscribed from %s" % topic)
00272 
00273     def publish(self, topic, message, fragment_size=None, compression="none"):
00274         """ Publish a message to the client
00275 
00276         Keyword arguments:
00277         topic   -- the topic to publish the message on
00278         message -- a dict of key-value pairs. Will be wrapped in a message with
00279         opcode publish
00280         fragment_size -- (optional) fragment the serialized message into msgs
00281         with payloads not greater than this value
00282         compression   -- (optional) compress the message. valid values are
00283         'png' and 'none'
00284 
00285         """
00286         # TODO: fragmentation, proper ids
00287         if Subscribe.topics_glob and Subscribe.topics_glob:
00288             self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
00289             match = False
00290             for glob in Subscribe.topics_glob:
00291                 if (fnmatch.fnmatch(topic, glob)):
00292                     self.protocol.log("debug", "Found match with glob " + glob + ", continuing topic publish...")
00293                     match = True
00294                     break
00295             if not match:
00296                 self.protocol.log("warn", "No match found for topic, cancelling topic publish to: " + topic)
00297                 return
00298         else:
00299             self.protocol.log("debug", "No topic security glob, not checking topic publish.")
00300 
00301         outgoing_msg = {"op": "publish", "topic": topic, "msg": message}
00302         if compression == "png":
00303             outgoing_msg_dumped = dumps(outgoing_msg)
00304             outgoing_msg = {"op": "png", "data": encode(outgoing_msg_dumped)}
00305         self.protocol.send(outgoing_msg)
00306 
00307     def finish(self):
00308         for subscription in self._subscriptions.values():
00309             subscription.unregister()
00310         self._subscriptions.clear()
00311         self.protocol.unregister_operation("subscribe")
00312         self.protocol.unregister_operation("unsubscribe")


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Sep 13 2017 03:18:07