subscribers.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2012, Willow Garage, Inc.
00004 # Copyright (c) 2013, PAL Robotics SL
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 from threading import Lock
00035 from rospy import Subscriber, logerr
00036 from rostopic import get_topic_type
00037 from rosbridge_library.internal import ros_loader, message_conversion
00038 from rosbridge_library.internal.topics import TopicNotEstablishedException
00039 from rosbridge_library.internal.topics import TypeConflictException
00040 
00041 """ Manages and interfaces with ROS Subscriber objects.  A single subscriber
00042 is shared between multiple clients
00043 """
00044 
00045 
00046 class MultiSubscriber():
00047     """ Handles multiple clients for a single subscriber.
00048 
00049     Converts msgs to JSON before handing them to callbacks.  Due to subscriber
00050     callbacks being called in separate threads, must lock whenever modifying
00051     or accessing the subscribed clients. """
00052 
00053     def __init__(self, topic, msg_type=None):
00054         """ Register a subscriber on the specified topic.
00055 
00056         Keyword arguments:
00057         topic    -- the name of the topic to register the subscriber on
00058         msg_type -- (optional) the type to register the subscriber as.  If not
00059         provided, an attempt will be made to infer the topic type
00060 
00061         Throws:
00062         TopicNotEstablishedException -- if no msg_type was specified by the
00063         caller and the topic is not yet established, so a topic type cannot
00064         be inferred
00065         TypeConflictException        -- if the msg_type was specified by the
00066         caller and the topic is established, and the established type is
00067         different to the user-specified msg_type
00068 
00069         """
00070         # First check to see if the topic is already established
00071         topic_type = get_topic_type(topic)[0]
00072 
00073         # If it's not established and no type was specified, exception
00074         if msg_type is None and topic_type is None:
00075             raise TopicNotEstablishedException(topic)
00076 
00077         # Use the established topic type if none was specified
00078         if msg_type is None:
00079             msg_type = topic_type
00080 
00081         # Load the message class, propagating any exceptions from bad msg types
00082         msg_class = ros_loader.get_message_class(msg_type)
00083 
00084         # Make sure the specified msg type and established msg type are same
00085         if topic_type is not None and topic_type != msg_class._type:
00086             raise TypeConflictException(topic, topic_type, msg_class._type)
00087 
00088         # Create the subscriber and associated member variables
00089         self.subscriptions = {}
00090         self.lock = Lock()
00091         self.topic = topic
00092         self.msg_class = msg_class
00093         self.subscriber = Subscriber(topic, msg_class, self.callback)
00094 
00095     def unregister(self):
00096         self.subscriber.unregister()
00097         with self.lock:
00098             self.subscriptions.clear()
00099 
00100     def verify_type(self, msg_type):
00101         """ Verify that the subscriber subscribes to messages of this type.
00102 
00103         Keyword arguments:
00104         msg_type -- the type to check this subscriber against
00105 
00106         Throws:
00107         Exception -- if ros_loader cannot load the specified msg type
00108         TypeConflictException -- if the msg_type is different than the type of
00109         this publisher
00110 
00111         """
00112         if not ros_loader.get_message_class(msg_type) is self.msg_class:
00113             raise TypeConflictException(self.topic,
00114                                         self.msg_class._type, msg_type)
00115         return
00116 
00117     def subscribe(self, client_id, callback):
00118         """ Subscribe the specified client to this subscriber.
00119 
00120         Keyword arguments:
00121         client_id -- the ID of the client subscribing
00122         callback  -- this client's callback, that will be called for incoming
00123         messages
00124 
00125         """
00126         with self.lock:
00127             self.subscriptions[client_id] = callback
00128             # If the topic is latched, add_callback will immediately invoke
00129             # the given callback.
00130             self.subscriber.impl.add_callback(self.callback, [callback])
00131             self.subscriber.impl.remove_callback(self.callback, [callback])
00132 
00133     def unsubscribe(self, client_id):
00134         """ Unsubscribe the specified client from this subscriber
00135 
00136         Keyword arguments:
00137         client_id -- the ID of the client to unsubscribe
00138 
00139         """
00140         with self.lock:
00141             del self.subscriptions[client_id]
00142 
00143     def has_subscribers(self):
00144         """ Return true if there are subscribers """
00145         with self.lock:
00146             ret = len(self.subscriptions) != 0
00147             return ret
00148 
00149     def callback(self, msg, callbacks=None):
00150         """ Callback for incoming messages on the rospy.Subscriber
00151 
00152         Converts the incoming msg to JSON, then passes the JSON to the
00153         registered subscriber callbacks.
00154 
00155         Keyword Arguments:
00156         msg - the ROS message coming from the subscriber
00157         callbacks - subscriber callbacks to invoke
00158 
00159         """
00160         # Try to convert the msg to JSON
00161         json = None
00162         try:
00163             json = message_conversion.extract_values(msg)
00164         except Exception as exc:
00165             logerr("Exception while converting messages in subscriber callback : %s", exc)
00166             return
00167         
00168         # Get the callbacks to call
00169         if not callbacks:
00170             with self.lock:
00171                 callbacks = self.subscriptions.values()
00172 
00173         # Pass the JSON to each of the callbacks
00174         for callback in callbacks:
00175             try:
00176                 callback(json)
00177             except Exception as exc:
00178                 # Do nothing if one particular callback fails except log it
00179                 logerr("Exception calling subscribe callback: %s", exc)
00180                 pass
00181 
00182 
00183 class SubscriberManager():
00184     """
00185     Keeps track of client subscriptions
00186     """
00187 
00188     def __init__(self):
00189         self._subscribers = {}
00190 
00191     def subscribe(self, client_id, topic, callback, msg_type=None):
00192         """ Subscribe to a topic
00193 
00194         Keyword arguments:
00195         client_id -- the ID of the client making this subscribe request
00196         topic     -- the name of the topic to subscribe to
00197         callback  -- the callback to call for incoming messages on the topic
00198         msg_type  -- (optional) the type of the topic
00199 
00200         """
00201         if not topic in self._subscribers:
00202             self._subscribers[topic] = MultiSubscriber(topic, msg_type)
00203 
00204         if msg_type is not None:
00205             self._subscribers[topic].verify_type(msg_type)
00206 
00207         self._subscribers[topic].subscribe(client_id, callback)
00208 
00209     def unsubscribe(self, client_id, topic):
00210         """ Unsubscribe from a topic
00211 
00212         Keyword arguments:
00213         client_id -- the ID of the client to unsubscribe
00214         topic     -- the topic to unsubscribe from
00215 
00216         """
00217         if not topic in self._subscribers:
00218             return
00219 
00220         self._subscribers[topic].unsubscribe(client_id)
00221 
00222         if not self._subscribers[topic].has_subscribers():
00223             self._subscribers[topic].unregister()
00224             del self._subscribers[topic]
00225 
00226 
00227 manager = SubscriberManager()
00228 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Aug 27 2015 14:50:35