3 import roslib; roslib.load_manifest(
'flexbe_core')
12 A proxy for subscribing topics that caches and buffers received messages. 14 _simulate_delay =
False 17 _persistant_topics = []
21 Initializes the proxy with optionally a given set of topics. 23 @type topics: dictionary string - message_class 24 @param topics: A dictionary containing a collection of topic - message type pairs. 26 for topic, msg_type
in topics.iteritems():
30 def subscribe(self, topic, msg_type, callback=None, buffered=False):
32 Adds a new subscriber to the proxy. 35 @param topic: The topic to subscribe. 37 @type msg_type: a message class 38 @param msg_type: The type of messages of this topic. 40 @type callback: function 41 @param callback: A function to be called when receiving messages. 43 @type buffered: boolean 44 @param buffered: True if all messages should be bufferd, False if only the last message should be cached. 46 if not topic
in ProxySubscriberCached._topics:
47 sub = rospy.Subscriber(topic, msg_type, self.
_callback, callback_args=topic)
48 ProxySubscriberCached._topics[topic] = {
'subscriber': sub,
53 if callback
is not None:
54 ProxySubscriberCached._topics[topic][
'subscriber'].impl.add_callback(callback,
None)
59 Standard callback that is executed when a message is received. 62 @param topic: The latest message received on this topic. 65 @param topic: The topic to which this callback belongs. 67 if ProxySubscriberCached._simulate_delay:
68 time.sleep(max(0, random.gauss(2, 0.6)))
70 if not ProxySubscriberCached._topics.has_key(topic):
72 ProxySubscriberCached._topics[topic][
'last_msg'] = msg
73 if ProxySubscriberCached._topics[topic][
'buffered']:
74 ProxySubscriberCached._topics[topic][
'msg_queue'].append(msg)
79 Adds the given callback to the topic subscriber. 82 @param topic: The topic to add the callback to. 84 @type callback: function 85 @param callback: The callback to be added. 87 ProxySubscriberCached._topics[topic][
'subscriber'].impl.add_callback(callback,
None)
92 Enables the buffer on the given topic. 95 @param topic: The topic of interest. 97 ProxySubscriberCached._topics[topic][
'buffered'] =
True 102 Disables the buffer on the given topic. 105 @param topic: The topic of interest. 107 ProxySubscriberCached._topics[topic][
'buffered'] =
False 108 ProxySubscriberCached._topics[topic][
'msg_queue'] = []
113 Checks if the subscriber on the given topic is available. 116 @param topic: The topic of interest. 118 return topic
in ProxySubscriberCached._topics
123 Returns the latest cached message of the given topic. 126 @param topic: The topic of interest. 128 return ProxySubscriberCached._topics[topic][
'last_msg']
133 Pops the oldest buffered message of the given topic. 136 @param topic: The topic of interest. 138 if not ProxySubscriberCached._topics[topic][
'buffered']:
139 rospy.logwarn(
'Attempted to access buffer of non-buffered topic!')
141 msg = ProxySubscriberCached._topics[topic][
'msg_queue'][0]
142 ProxySubscriberCached._topics[topic][
'msg_queue'] = ProxySubscriberCached._topics[topic][
'msg_queue'][1:]
148 Determines if the given topic has a message in its cache. 151 @param topic: The topic of interest. 153 return ProxySubscriberCached._topics[topic][
'last_msg']
is not None 158 Determines if the given topic has any messages in its buffer. 161 @param topic: The topic of interest. 163 return len(ProxySubscriberCached._topics[topic][
'msg_queue']) > 0
168 Removes the cached message of the given topic and optionally clears its buffer. 171 @param topic: The topic of interest. 174 @param topic: Set to true if the buffer of the given topic should be cleared as well. 176 if ProxySubscriberCached._persistant_topics.count(topic) > 0:
return 177 ProxySubscriberCached._topics[topic][
'last_msg'] =
None 179 ProxySubscriberCached._topics[topic][
'msg_queue'] = []
184 Makes the given topic persistant which means messages can no longer be removed 185 (remove_last_msg will have no effect), only overwritten by a new message. 188 @param topic: The topic of interest. 190 ProxySubscriberCached._persistant_topics.append(topic)
195 Determines if the given topic is already subscribed. 198 @param topic: The topic of interest. 200 if topic
in ProxySubscriberCached._topics:
207 Removes the given topic from the list of subscribed topics. 210 @param topic: The topic of interest. 212 if topic
in ProxySubscriberCached._topics:
213 ProxySubscriberCached._topics[topic][
'subscriber'].unregister()
214 ProxySubscriberCached._topics.pop(topic)
218 """ Shuts this proxy down by unregistering all subscribers. """ 220 for topic
in ProxySubscriberCached._topics:
222 ProxySubscriberCached._topics[topic][
'subscriber'].unregister()
223 except Exception
as e:
224 rospy.logerr(
'Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
225 except Exception
as e:
226 rospy.logerr(
'Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
227 ProxySubscriberCached._topics.clear()
def has_buffered(self, topic)
def get_last_msg(self, topic)
def get_from_buffer(self, topic)
def has_topic(self, topic)
def enable_buffer(self, topic)
def make_persistant(self, topic)
def __init__(self, topics={})
def remove_last_msg(self, topic, clear_buffer=False)
def _callback(self, msg, topic)
def unsubscribe_topic(self, topic)
def set_callback(self, topic, callback)
def is_available(self, topic)
def subscribe(self, topic, msg_type, callback=None, buffered=False)
def disable_buffer(self, topic)