9 A proxy for subscribing topics that caches and buffers received messages.
12 _persistant_topics = []
16 Initializes the proxy with optionally a given set of topics.
18 @type topics: dictionary string - message_class
19 @param topics: A dictionary containing a collection of topic - message type pairs.
21 for topic, msg_type
in topics.items():
24 def subscribe(self, topic, msg_type, callback=None, buffered=False):
26 Adds a new subscriber to the proxy.
29 @param topic: The topic to subscribe.
31 @type msg_type: a message class
32 @param msg_type: The type of messages of this topic.
34 @type callback: function
35 @param callback: A function to be called when receiving messages.
37 @type buffered: boolean
38 @param buffered: True if all messages should be bufferd, False if only the last message should be cached.
40 if topic
not in ProxySubscriberCached._topics:
41 sub = rospy.Subscriber(topic, msg_type, self.
_callback, callback_args=topic)
42 ProxySubscriberCached._topics[topic] = {
'subscriber': sub,
46 if callback
is not None:
47 ProxySubscriberCached._topics[topic][
'subscriber'].impl.add_callback(callback,
None)
51 Standard callback that is executed when a message is received.
54 @param topic: The latest message received on this topic.
57 @param topic: The topic to which this callback belongs.
59 if topic
not in ProxySubscriberCached._topics:
61 ProxySubscriberCached._topics[topic][
'last_msg'] = msg
62 if ProxySubscriberCached._topics[topic][
'buffered']:
63 ProxySubscriberCached._topics[topic][
'msg_queue'].append(msg)
67 Adds the given callback to the topic subscriber.
70 @param topic: The topic to add the callback to.
72 @type callback: function
73 @param callback: The callback to be added.
75 ProxySubscriberCached._topics[topic][
'subscriber'].impl.add_callback(callback,
None)
79 Enables the buffer on the given topic.
82 @param topic: The topic of interest.
84 ProxySubscriberCached._topics[topic][
'buffered'] =
True
88 Disables the buffer on the given topic.
91 @param topic: The topic of interest.
93 ProxySubscriberCached._topics[topic][
'buffered'] =
False
94 ProxySubscriberCached._topics[topic][
'msg_queue'] = []
98 Checks if the subscriber on the given topic is available.
101 @param topic: The topic of interest.
103 return topic
in ProxySubscriberCached._topics
107 Returns the latest cached message of the given topic.
110 @param topic: The topic of interest.
112 return ProxySubscriberCached._topics[topic][
'last_msg']
116 Pops the oldest buffered message of the given topic.
119 @param topic: The topic of interest.
121 if not ProxySubscriberCached._topics[topic][
'buffered']:
122 Logger.logwarn(
'Attempted to access buffer of non-buffered topic!')
124 if len(ProxySubscriberCached._topics[topic][
'msg_queue']) == 0:
126 msg = ProxySubscriberCached._topics[topic][
'msg_queue'][0]
127 ProxySubscriberCached._topics[topic][
'msg_queue'] = ProxySubscriberCached._topics[topic][
'msg_queue'][1:]
132 Determines if the given topic has a message in its cache.
135 @param topic: The topic of interest.
138 return ProxySubscriberCached._topics[topic][
'last_msg']
is not None
143 Determines if the given topic has any messages in its buffer.
146 @param topic: The topic of interest.
148 return len(ProxySubscriberCached._topics[topic][
'msg_queue']) > 0
152 Removes the cached message of the given topic and optionally clears its buffer.
155 @param topic: The topic of interest.
158 @param topic: Set to true if the buffer of the given topic should be cleared as well.
160 if topic
in ProxySubscriberCached._persistant_topics:
162 ProxySubscriberCached._topics[topic][
'last_msg'] =
None
164 ProxySubscriberCached._topics[topic][
'msg_queue'] = []
168 Makes the given topic persistant which means messages can no longer be removed
169 (remove_last_msg will have no effect), only overwritten by a new message.
172 @param topic: The topic of interest.
174 if topic
not in ProxySubscriberCached._persistant_topics:
175 ProxySubscriberCached._persistant_topics.append(topic)
179 Determines if the given topic is already subscribed.
182 @param topic: The topic of interest.
184 Logger.logwarn(
'Deprecated (ProxySubscriberCached): use "is_available(topic)" instead of "has_topic(topic)".')
189 Removes the given topic from the list of subscribed topics.
192 @param topic: The topic of interest.
194 if topic
in ProxySubscriberCached._topics:
195 ProxySubscriberCached._topics[topic][
'subscriber'].unregister()
196 ProxySubscriberCached._topics.pop(topic)
199 """ Shuts this proxy down by unregistering all subscribers. """
201 for topic
in ProxySubscriberCached._topics:
203 ProxySubscriberCached._topics[topic][
'subscriber'].unregister()
204 except Exception
as e:
205 rospy.logerr(
'Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
206 except Exception
as e:
207 rospy.logerr(
'Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
208 ProxySubscriberCached._topics.clear()