subscribe.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2012, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 from threading import Lock
00034 from functools import partial
00035 from rospy import loginfo
00036 from rosbridge_library.capability import Capability
00037 from rosbridge_library.internal.subscribers import manager
00038 from rosbridge_library.internal.subscription_modifiers import MessageHandler
00039 from rosbridge_library.internal.pngcompression import encode
00040 try:
00041     from ujson import dumps
00042 except ImportError:
00043     try:
00044         from simplejson import dumps
00045     except ImportError:
00046         from json import dumps
00047 
00048 
00049 class Subscription():
00050     """ Keeps track of the clients multiple calls to subscribe.
00051 
00052     Chooses the most appropriate settings to send messages """
00053 
00054     def __init__(self, client_id, topic, publish):
00055         """ Create a subscription for the specified client on the specified
00056         topic, with callback publish
00057 
00058         Keyword arguments:
00059         client_id -- the ID of the client making this subscription
00060         topic     -- the name of the topic to subscribe to
00061         publish   -- the callback function for incoming messages
00062 
00063         """
00064         self.client_id = client_id
00065         self.topic = topic
00066         self.publish = publish
00067 
00068         self.clients = {}
00069 
00070         self.handler = MessageHandler(None, self._publish)
00071         self.handler_lock = Lock()
00072         self.update_params()
00073 
00074     def unregister(self):
00075         """ Unsubscribes this subscription and cleans up resources """
00076         manager.unsubscribe(self.client_id, self.topic)
00077         with self.handler_lock:
00078             self.handler.finish()
00079         self.clients.clear()
00080 
00081     def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
00082                   queue_length=0, fragment_size=None, compression="none"):
00083         """ Add another client's subscription request
00084 
00085         If there are multiple calls to subscribe, the values actually used for
00086         queue_length, fragment_size, compression and throttle_rate are
00087         chosen to encompass all subscriptions' requirements
00088 
00089         Keyword arguments:
00090         sid             -- the subscription id from the client
00091         msg_type        -- the type of the message to subscribe to
00092         throttle_rate   -- the minimum time (in ms) allowed between messages
00093         being sent.  If multiple subscriptions, the lower of these is used
00094         queue_length    -- the number of messages that can be buffered.  If
00095         multiple subscriptions, the lower of these is used
00096         fragment_size   -- None if no fragmentation, or the maximum length of
00097         allowed outgoing messages
00098         compression     -- "none" if no compression, or some other value if
00099         compression is to be used (current valid values are 'png')
00100 
00101          """
00102 
00103         client_details = {
00104             "throttle_rate": throttle_rate,
00105             "queue_length": queue_length,
00106             "fragment_size": fragment_size,
00107             "compression": compression
00108         }
00109 
00110         self.clients[sid] = client_details
00111 
00112         self.update_params()
00113 
00114         # Subscribe with the manager. This will propagate any exceptions
00115         manager.subscribe(self.client_id, self.topic, self.on_msg, msg_type)
00116 
00117     def unsubscribe(self, sid=None):
00118         """ Unsubscribe this particular client's subscription
00119 
00120         Keyword arguments:
00121         sid -- the individual subscription id.  If None, all are unsubscribed
00122 
00123         """
00124         if sid is None:
00125             self.clients.clear()
00126         elif sid in self.clients:
00127             del self.clients[sid]
00128 
00129         if not self.is_empty():
00130             self.update_params()
00131 
00132     def is_empty(self):
00133         """ Return true if there are no subscriptions currently """
00134         return len(self.clients) == 0
00135 
00136     def _publish(self, message):
00137         """ Internal method to propagate published messages to the registered
00138         publish callback """
00139         self.publish(message, self.fragment_size, self.compression)
00140 
00141     def on_msg(self, msg):
00142         """ Raw callback called by subscription manager for all incoming
00143         messages.
00144 
00145         Incoming messages are passed to the message handler which may drop,
00146         buffer, or propagate the message
00147 
00148         """
00149         with self.handler_lock:
00150             self.handler.handle_message(msg)
00151 
00152     def update_params(self):
00153         """ Determine the 'lowest common denominator' params to satisfy all
00154         subscribed clients. """
00155         if len(self.clients) == 0:
00156             self.throttle_rate = 0
00157             self.queue_length = 0
00158             self.fragment_size = None
00159             self.compression = "none"
00160             return
00161 
00162         def f(fieldname):
00163             return [x[fieldname] for x in self.clients.values()]
00164 
00165         self.throttle_rate = min(f("throttle_rate"))
00166         self.queue_length = min(f("queue_length"))
00167         frags = [x for x in f("fragment_size") if x != None]
00168         if frags == []:
00169             self.fragment_size = None
00170         else:
00171             self.fragment_size = min(frags)
00172         self.compression = "png" if "png" in f("compression") else "none"
00173 
00174         with self.handler_lock:
00175             self.handler = self.handler.set_throttle_rate(self.throttle_rate)
00176             self.handler = self.handler.set_queue_length(self.queue_length)
00177 
00178 
00179 class Subscribe(Capability):
00180 
00181     subscribe_msg_fields = [(True, "topic", (str, unicode)), (False, "type", (str, unicode)),
00182         (False, "throttle_rate", int), (False, "fragment_size", int),
00183         (False, "queue_length", int), (False, "compression", (str, unicode))]
00184     unsubscribe_msg_fields = [(True, "topic", (str, unicode))]
00185 
00186     def __init__(self, protocol):
00187         # Call superclass constructor
00188         Capability.__init__(self, protocol)
00189 
00190         # Register the operations that this capability provides
00191         protocol.register_operation("subscribe", self.subscribe)
00192         protocol.register_operation("unsubscribe", self.unsubscribe)
00193 
00194         self._subscriptions = {}
00195 
00196     def subscribe(self, msg):
00197         # Pull out the ID
00198         sid = msg.get("id", None)
00199         
00200         # Check the args
00201         self.basic_type_check(msg, self.subscribe_msg_fields)
00202 
00203         # Make the subscription
00204         topic = msg["topic"]
00205         if not topic in self._subscriptions:
00206             client_id = self.protocol.client_id
00207             cb = partial(self.publish, topic)
00208             self._subscriptions[topic] = Subscription(client_id, topic, cb)
00209 
00210         # Register the subscriber
00211         subscribe_args = {
00212           "sid": sid,
00213           "msg_type": msg.get("type", None),
00214           "throttle_rate": msg.get("throttle_rate", 0),
00215           "fragment_size": msg.get("fragment_size", None),
00216           "queue_length": msg.get("queue_length", 0),
00217           "compression": msg.get("compression", "none")
00218         }
00219         self._subscriptions[topic].subscribe(**subscribe_args)
00220 
00221         self.protocol.log("info", "Subscribed to %s" % topic)
00222 
00223     def unsubscribe(self, msg):
00224         # Pull out the ID
00225         sid = msg.get("id", None)
00226         
00227         self.basic_type_check(msg, self.unsubscribe_msg_fields)
00228 
00229         topic = msg["topic"]
00230         if topic not in self._subscriptions:
00231             return
00232         self._subscriptions[topic].unsubscribe(sid)
00233 
00234         if self._subscriptions[topic].is_empty():
00235             self._subscriptions[topic].unregister()
00236             del self._subscriptions[topic]
00237 
00238         self.protocol.log("info", "Unsubscribed from %s" % topic)
00239 
00240     def publish(self, topic, message, fragment_size=None, compression="none"):
00241         """ Publish a message to the client
00242 
00243         Keyword arguments:
00244         topic   -- the topic to publish the message on
00245         message -- a dict of key-value pairs. Will be wrapped in a message with
00246         opcode publish
00247         fragment_size -- (optional) fragment the serialized message into msgs
00248         with payloads not greater than this value
00249         compression   -- (optional) compress the message. valid values are
00250         'png' and 'none'
00251 
00252         """
00253         # TODO: fragmentation, proper ids
00254         outgoing_msg = {"op": "publish", "topic": topic, "msg": message}
00255         if compression=="png":
00256             outgoing_msg_dumped = dumps(outgoing_msg)
00257             outgoing_msg = {"op": "png", "data": encode(outgoing_msg_dumped)}
00258         self.protocol.send(outgoing_msg)
00259 
00260     def finish(self):
00261         for subscription in self._subscriptions.values():
00262             subscription.unregister()
00263         self._subscriptions.clear()
00264         self.protocol.unregister_operation("subscribe")
00265         self.protocol.unregister_operation("unsubscribe")


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Aug 27 2015 14:50:35