9 A proxy for subscribing topics that caches and buffers received messages. 12 _persistant_topics = []
16 Initializes the proxy with optionally a given set of topics. 18 @type topics: dictionary string - message_class 19 @param topics: A dictionary containing a collection of topic - message type pairs. 21 for topic, msg_type
in topics.items():
24 def subscribe(self, topic, msg_type, callback=None, buffered=False):
26 Adds a new subscriber to the proxy. 29 @param topic: The topic to subscribe. 31 @type msg_type: a message class 32 @param msg_type: The type of messages of this topic. 34 @type callback: function 35 @param callback: A function to be called when receiving messages. 37 @type buffered: boolean 38 @param buffered: True if all messages should be bufferd, False if only the last message should be cached. 40 if topic
not in ProxySubscriberCached._topics:
41 sub = rospy.Subscriber(topic, msg_type, self.
_callback, callback_args=topic)
42 ProxySubscriberCached._topics[topic] = {
'subscriber': sub,
46 if callback
is not None:
47 ProxySubscriberCached._topics[topic][
'subscriber'].impl.add_callback(callback,
None)
51 Standard callback that is executed when a message is received. 54 @param topic: The latest message received on this topic. 57 @param topic: The topic to which this callback belongs. 59 if topic
not in ProxySubscriberCached._topics:
61 ProxySubscriberCached._topics[topic][
'last_msg'] = msg
62 if ProxySubscriberCached._topics[topic][
'buffered']:
63 ProxySubscriberCached._topics[topic][
'msg_queue'].append(msg)
67 Adds the given callback to the topic subscriber. 70 @param topic: The topic to add the callback to. 72 @type callback: function 73 @param callback: The callback to be added. 75 ProxySubscriberCached._topics[topic][
'subscriber'].impl.add_callback(callback,
None)
79 Enables the buffer on the given topic. 82 @param topic: The topic of interest. 84 ProxySubscriberCached._topics[topic][
'buffered'] =
True 88 Disables the buffer on the given topic. 91 @param topic: The topic of interest. 93 ProxySubscriberCached._topics[topic][
'buffered'] =
False 94 ProxySubscriberCached._topics[topic][
'msg_queue'] = []
98 Checks if the subscriber on the given topic is available. 101 @param topic: The topic of interest. 103 return topic
in ProxySubscriberCached._topics
107 Returns the latest cached message of the given topic. 110 @param topic: The topic of interest. 112 return ProxySubscriberCached._topics[topic][
'last_msg']
116 Pops the oldest buffered message of the given topic. 119 @param topic: The topic of interest. 121 if not ProxySubscriberCached._topics[topic][
'buffered']:
122 Logger.logwarn(
'Attempted to access buffer of non-buffered topic!')
124 if len(ProxySubscriberCached._topics[topic][
'msg_queue']) == 0:
126 msg = ProxySubscriberCached._topics[topic][
'msg_queue'][0]
127 ProxySubscriberCached._topics[topic][
'msg_queue'] = ProxySubscriberCached._topics[topic][
'msg_queue'][1:]
132 Determines if the given topic has a message in its cache. 135 @param topic: The topic of interest. 137 return ProxySubscriberCached._topics[topic][
'last_msg']
is not None 141 Determines if the given topic has any messages in its buffer. 144 @param topic: The topic of interest. 146 return len(ProxySubscriberCached._topics[topic][
'msg_queue']) > 0
150 Removes the cached message of the given topic and optionally clears its buffer. 153 @param topic: The topic of interest. 156 @param topic: Set to true if the buffer of the given topic should be cleared as well. 158 if topic
in ProxySubscriberCached._persistant_topics:
160 ProxySubscriberCached._topics[topic][
'last_msg'] =
None 162 ProxySubscriberCached._topics[topic][
'msg_queue'] = []
166 Makes the given topic persistant which means messages can no longer be removed 167 (remove_last_msg will have no effect), only overwritten by a new message. 170 @param topic: The topic of interest. 172 if topic
not in ProxySubscriberCached._persistant_topics:
173 ProxySubscriberCached._persistant_topics.append(topic)
177 Determines if the given topic is already subscribed. 180 @param topic: The topic of interest. 182 Logger.logwarn(
'Deprecated (ProxySubscriberCached): use "is_available(topic)" instead of "has_topic(topic)".')
187 Removes the given topic from the list of subscribed topics. 190 @param topic: The topic of interest. 192 if topic
in ProxySubscriberCached._topics:
193 ProxySubscriberCached._topics[topic][
'subscriber'].unregister()
194 ProxySubscriberCached._topics.pop(topic)
197 """ Shuts this proxy down by unregistering all subscribers. """ 199 for topic
in ProxySubscriberCached._topics:
201 ProxySubscriberCached._topics[topic][
'subscriber'].unregister()
202 except Exception
as e:
203 rospy.logerr(
'Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
204 except Exception
as e:
205 rospy.logerr(
'Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
206 ProxySubscriberCached._topics.clear()
def has_buffered(self, topic)
def get_last_msg(self, topic)
def get_from_buffer(self, topic)
def has_topic(self, topic)
def enable_buffer(self, topic)
def make_persistant(self, topic)
def __init__(self, topics={})
def remove_last_msg(self, topic, clear_buffer=False)
def _callback(self, msg, topic)
def unsubscribe_topic(self, topic)
def set_callback(self, topic, callback)
def is_available(self, topic)
def subscribe(self, topic, msg_type, callback=None, buffered=False)
def disable_buffer(self, topic)