proxy_subscriber_cached.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import roslib; roslib.load_manifest('flexbe_core')
00004 import rospy
00005 
00006 import time
00007 import random
00008 
00009 
00010 class ProxySubscriberCached(object):
00011     """
00012     A proxy for subscribing topics that caches and buffers received messages.
00013     """
00014     _simulate_delay = False
00015     
00016     _topics = {}
00017     _persistant_topics = []
00018     
00019     def __init__(self, topics={}):
00020         """
00021         Initializes the proxy with optionally a given set of topics.
00022         
00023         @type topics: dictionary string - message_class
00024         @param topics: A dictionary containing a collection of topic - message type pairs.
00025         """
00026         for topic, msg_type in topics.iteritems():
00027             self.subscribe(topic, msg_type)
00028     
00029     
00030     def subscribe(self, topic, msg_type, callback=None, buffered=False):
00031         """
00032         Adds a new subscriber to the proxy.
00033         
00034         @type topic: string
00035         @param topic: The topic to subscribe.
00036         
00037         @type msg_type: a message class
00038         @param msg_type: The type of messages of this topic.
00039         
00040         @type callback: function
00041         @param callback: A function to be called when receiving messages.
00042         
00043         @type buffered: boolean
00044         @param buffered: True if all messages should be bufferd, False if only the last message should be cached.
00045         """
00046         if not topic in ProxySubscriberCached._topics:
00047             sub = rospy.Subscriber(topic, msg_type, self._callback, callback_args=topic)
00048             ProxySubscriberCached._topics[topic] = {'subscriber': sub, 
00049                                                     'last_msg': None, 
00050                                                     'buffered': buffered, 
00051                                                     'msg_queue':[]}
00052             
00053         if callback is not None:
00054             ProxySubscriberCached._topics[topic]['subscriber'].impl.add_callback(callback, None)
00055             
00056             
00057     def _callback(self, msg, topic):
00058         """
00059         Standard callback that is executed when a message is received.
00060         
00061         @type topic: message
00062         @param topic: The latest message received on this topic.
00063         
00064         @type topic: string
00065         @param topic: The topic to which this callback belongs.
00066         """
00067         if ProxySubscriberCached._simulate_delay:
00068             time.sleep(max(0, random.gauss(2, 0.6))) # for simulating comms_bridge delay
00069 
00070         if not ProxySubscriberCached._topics.has_key(topic):
00071             return
00072         ProxySubscriberCached._topics[topic]['last_msg'] = msg
00073         if ProxySubscriberCached._topics[topic]['buffered']:
00074             ProxySubscriberCached._topics[topic]['msg_queue'].append(msg)
00075      
00076         
00077     def set_callback(self, topic, callback):
00078         """ 
00079         Adds the given callback to the topic subscriber.
00080         
00081         @type topic: string
00082         @param topic: The topic to add the callback to.
00083         
00084         @type callback: function
00085         @param callback: The callback to be added.
00086         """
00087         ProxySubscriberCached._topics[topic]['subscriber'].impl.add_callback(callback, None)
00088       
00089         
00090     def enable_buffer(self, topic):
00091         """
00092         Enables the buffer on the given topic.
00093 
00094         @type topic: string
00095         @param topic: The topic of interest.
00096         """
00097         ProxySubscriberCached._topics[topic]['buffered'] = True
00098         
00099         
00100     def disable_buffer(self, topic):
00101         """
00102         Disables the buffer on the given topic.
00103 
00104         @type topic: string
00105         @param topic: The topic of interest.
00106         """
00107         ProxySubscriberCached._topics[topic]['buffered'] = False
00108         ProxySubscriberCached._topics[topic]['msg_queue'] = []
00109             
00110 
00111     def is_available(self, topic):
00112         """
00113         Checks if the subscriber on the given topic is available.
00114         
00115         @type topic: string
00116         @param topic: The topic of interest.
00117         """
00118         return topic in ProxySubscriberCached._topics
00119        
00120        
00121     def get_last_msg(self, topic):
00122         """
00123         Returns the latest cached message of the given topic.
00124 
00125         @type topic: string
00126         @param topic: The topic of interest.
00127         """
00128         return ProxySubscriberCached._topics[topic]['last_msg']
00129     
00130     
00131     def get_from_buffer(self, topic):
00132         """
00133         Pops the oldest buffered message of the given topic.
00134 
00135         @type topic: string
00136         @param topic: The topic of interest.
00137         """
00138         if not ProxySubscriberCached._topics[topic]['buffered']:
00139             rospy.logwarn('Attempted to access buffer of non-buffered topic!')
00140             return None
00141         msg = ProxySubscriberCached._topics[topic]['msg_queue'][0]
00142         ProxySubscriberCached._topics[topic]['msg_queue'] = ProxySubscriberCached._topics[topic]['msg_queue'][1:]
00143         return msg
00144     
00145     
00146     def has_msg(self, topic):
00147         """
00148         Determines if the given topic has a message in its cache.
00149 
00150         @type topic: string
00151         @param topic: The topic of interest.
00152         """
00153         return ProxySubscriberCached._topics[topic]['last_msg'] is not None
00154     
00155     
00156     def has_buffered(self, topic):
00157         """
00158         Determines if the given topic has any messages in its buffer.
00159 
00160         @type topic: string
00161         @param topic: The topic of interest.
00162         """
00163         return len(ProxySubscriberCached._topics[topic]['msg_queue']) > 0
00164     
00165     
00166     def remove_last_msg(self, topic, clear_buffer=False):   # better pop last msg
00167         """
00168         Removes the cached message of the given topic and optionally clears its buffer.
00169 
00170         @type topic: string
00171         @param topic: The topic of interest.
00172 
00173         @type topic: boolean
00174         @param topic: Set to true if the buffer of the given topic should be cleared as well.
00175         """
00176         if ProxySubscriberCached._persistant_topics.count(topic) > 0: return
00177         ProxySubscriberCached._topics[topic]['last_msg'] = None
00178         if clear_buffer:
00179             ProxySubscriberCached._topics[topic]['msg_queue'] = []
00180         
00181         
00182     def make_persistant(self, topic):
00183         """
00184         Makes the given topic persistant which means messages can no longer be removed
00185         (remove_last_msg will have no effect), only overwritten by a new message.
00186 
00187         @type topic: string
00188         @param topic: The topic of interest.
00189         """
00190         ProxySubscriberCached._persistant_topics.append(topic)
00191         
00192         
00193     def has_topic(self, topic):
00194         """
00195         Determines if the given topic is already subscribed.
00196 
00197         @type topic: string
00198         @param topic: The topic of interest.
00199         """
00200         if topic in ProxySubscriberCached._topics:
00201             return True
00202         return False
00203         
00204         
00205     def unsubscribe_topic(self, topic):
00206         """
00207         Removes the given topic from the list of subscribed topics.
00208 
00209         @type topic: string
00210         @param topic: The topic of interest.
00211         """
00212         if topic in ProxySubscriberCached._topics:
00213             ProxySubscriberCached._topics[topic]['subscriber'].unregister()
00214             ProxySubscriberCached._topics.pop(topic)
00215             
00216     
00217     def shutdown(self):
00218         """ Shuts this proxy down by unregistering all subscribers. """
00219         try:
00220             for topic in ProxySubscriberCached._topics:
00221                 try:
00222                     ProxySubscriberCached._topics[topic]['subscriber'].unregister()
00223                 except Exception as e:
00224                     rospy.logerr('Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
00225         except Exception as e:
00226             rospy.logerr('Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
00227         ProxySubscriberCached._topics.clear()


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Thu Jun 6 2019 19:32:27