subscribe.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2012, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 from threading import Lock
00034 from functools import partial
00035 from rospy import loginfo
00036 from rosbridge_library.capability import Capability
00037 from rosbridge_library.internal.subscribers import manager
00038 from rosbridge_library.internal.subscription_modifiers import MessageHandler
00039 from rosbridge_library.internal.pngcompression import encode
00040 try:
00041     from ujson import dumps
00042 except ImportError:
00043     try:
00044         from simplejson import dumps
00045     except ImportError:
00046         from json import dumps
00047 
00048 
00049 class Subscription():
00050     """ Keeps track of the clients multiple calls to subscribe.
00051 
00052     Chooses the most appropriate settings to send messages """
00053 
00054     def __init__(self, client_id, topic, publish):
00055         """ Create a subscription for the specified client on the specified
00056         topic, with callback publish
00057 
00058         Keyword arguments:
00059         client_id -- the ID of the client making this subscription
00060         topic     -- the name of the topic to subscribe to
00061         publish   -- the callback function for incoming messages
00062 
00063         """
00064         self.client_id = client_id
00065         self.topic = topic
00066         self.publish = publish
00067 
00068         self.clients = {}
00069 
00070         self.handler = MessageHandler(None, self._publish)
00071         self.handler_lock = Lock()
00072         self.update_params()
00073 
00074     def unregister(self):
00075         """ Unsubscribes this subscription and cleans up resources """
00076         manager.unsubscribe(self.client_id, self.topic)
00077         with self.handler_lock:
00078             self.handler.finish()
00079         self.clients.clear()
00080 
00081     def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
00082                   queue_length=0, fragment_size=None, compression="none"):
00083         """ Add another client's subscription request
00084 
00085         If there are multiple calls to subscribe, the values actually used for
00086         queue_length, fragment_size, compression and throttle_rate are
00087         chosen to encompass all subscriptions' requirements
00088 
00089         Keyword arguments:
00090         sid             -- the subscription id from the client
00091         msg_type        -- the type of the message to subscribe to
00092         throttle_rate   -- the minimum time (in ms) allowed between messages
00093         being sent.  If multiple subscriptions, the lower of these is used
00094         queue_length    -- the number of messages that can be buffered.  If
00095         multiple subscriptions, the lower of these is used
00096         fragment_size   -- None if no fragmentation, or the maximum length of
00097         allowed outgoing messages
00098         compression     -- "none" if no compression, or some other value if
00099         compression is to be used (current valid values are 'png')
00100 
00101          """
00102         # Subscribe with the manager. This will propagate any exceptions
00103         manager.subscribe(self.client_id, self.topic, self.on_msg, msg_type)
00104 
00105         client_details = {
00106             "throttle_rate": throttle_rate,
00107             "queue_length": queue_length,
00108             "fragment_size": fragment_size,
00109             "compression": compression
00110         }
00111 
00112         self.clients[sid] = client_details
00113 
00114         self.update_params()
00115 
00116     def unsubscribe(self, sid=None):
00117         """ Unsubscribe this particular client's subscription
00118 
00119         Keyword arguments:
00120         sid -- the individual subscription id.  If None, all are unsubscribed
00121 
00122         """
00123         if sid is None:
00124             self.clients.clear()
00125         elif sid in self.clients:
00126             del self.clients[sid]
00127 
00128         if not self.is_empty():
00129             self.update_params()
00130 
00131     def is_empty(self):
00132         """ Return true if there are no subscriptions currently """
00133         return len(self.clients) == 0
00134 
00135     def _publish(self, message):
00136         """ Internal method to propagate published messages to the registered
00137         publish callback """
00138         self.publish(message, self.fragment_size, self.compression)
00139 
00140     def on_msg(self, msg):
00141         """ Raw callback called by subscription manager for all incoming
00142         messages.
00143 
00144         Incoming messages are passed to the message handler which may drop,
00145         buffer, or propagate the message
00146 
00147         """
00148         with self.handler_lock:
00149             self.handler.handle_message(msg)
00150 
00151     def update_params(self):
00152         """ Determine the 'lowest common denominator' params to satisfy all
00153         subscribed clients. """
00154         if len(self.clients) == 0:
00155             self.throttle_rate = 0
00156             self.queue_length = 0
00157             self.fragment_size = None
00158             self.compression = "none"
00159             return
00160 
00161         def f(fieldname):
00162             return [x[fieldname] for x in self.clients.values()]
00163 
00164         self.throttle_rate = min(f("throttle_rate"))
00165         self.queue_length = min(f("queue_length"))
00166         frags = [x for x in f("fragment_size") if x != None]
00167         if frags == []:
00168             self.fragment_size = None
00169         else:
00170             self.fragment_size = min(frags)
00171         self.compression = "png" if "png" in f("compression") else "none"
00172 
00173         with self.handler_lock:
00174             self.handler = self.handler.set_throttle_rate(self.throttle_rate)
00175             self.handler = self.handler.set_queue_length(self.queue_length)
00176 
00177 
00178 class Subscribe(Capability):
00179 
00180     subscribe_msg_fields = [(True, "topic", (str, unicode)), (False, "type", (str, unicode)),
00181         (False, "throttle_rate", int), (False, "fragment_size", int),
00182         (False, "queue_length", int), (False, "compression", (str, unicode))]
00183     unsubscribe_msg_fields = [(True, "topic", (str, unicode))]
00184 
00185     def __init__(self, protocol):
00186         # Call superclass constructor
00187         Capability.__init__(self, protocol)
00188 
00189         # Register the operations that this capability provides
00190         protocol.register_operation("subscribe", self.subscribe)
00191         protocol.register_operation("unsubscribe", self.unsubscribe)
00192 
00193         self._subscriptions = {}
00194 
00195     def subscribe(self, msg):
00196         # Pull out the ID
00197         sid = msg.get("id", None)
00198         
00199         # Check the args
00200         self.basic_type_check(msg, self.subscribe_msg_fields)
00201 
00202         # Make the subscription
00203         topic = msg["topic"]
00204         if not topic in self._subscriptions:
00205             client_id = self.protocol.client_id
00206             cb = partial(self.publish, topic)
00207             self._subscriptions[topic] = Subscription(client_id, topic, cb)
00208 
00209         # Register the subscriber
00210         subscribe_args = {
00211           "sid": sid,
00212           "msg_type": msg.get("type", None),
00213           "throttle_rate": msg.get("throttle_rate", 0),
00214           "fragment_size": msg.get("fragment_size", None),
00215           "queue_length": msg.get("queue_length", 0),
00216           "compression": msg.get("compression", "none")
00217         }
00218         self._subscriptions[topic].subscribe(**subscribe_args)
00219 
00220         self.protocol.log("info", "Subscribed to %s" % topic)
00221 
00222     def unsubscribe(self, msg):
00223         # Pull out the ID
00224         sid = msg.get("id", None)
00225         
00226         self.basic_type_check(msg, self.unsubscribe_msg_fields)
00227 
00228         topic = msg["topic"]
00229         if topic not in self._subscriptions:
00230             return
00231         self._subscriptions[topic].unsubscribe(sid)
00232 
00233         if self._subscriptions[topic].is_empty():
00234             self._subscriptions[topic].unregister()
00235             del self._subscriptions[topic]
00236 
00237         self.protocol.log("info", "Unsubscribed from %s" % topic)
00238 
00239     def publish(self, topic, message, fragment_size=None, compression="none"):
00240         """ Publish a message to the client
00241 
00242         Keyword arguments:
00243         topic   -- the topic to publish the message on
00244         message -- a dict of key-value pairs. Will be wrapped in a message with
00245         opcode publish
00246         fragment_size -- (optional) fragment the serialized message into msgs
00247         with payloads not greater than this value
00248         compression   -- (optional) compress the message. valid values are
00249         'png' and 'none'
00250 
00251         """
00252         # TODO: fragmentation, proper ids
00253         outgoing_msg = {"op": "publish", "topic": topic, "msg": message}
00254         if compression=="png":
00255             outgoing_msg_dumped = dumps(outgoing_msg)
00256             outgoing_msg = {"op": "png", "data": encode(outgoing_msg_dumped)}
00257         self.protocol.send(outgoing_msg)
00258 
00259     def finish(self):
00260         for subscription in self._subscriptions.values():
00261             subscription.unregister()
00262         self._subscriptions.clear()
00263         self.protocol.unregister_operation("subscribe")
00264         self.protocol.unregister_operation("unsubscribe")


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:53:35