proxy_subscriber_cached.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 
4 from flexbe_core.logger import Logger
5 
6 
7 class ProxySubscriberCached(object):
8  """
9  A proxy for subscribing topics that caches and buffers received messages.
10  """
11  _topics = {}
12  _persistant_topics = []
13 
14  def __init__(self, topics={}):
15  """
16  Initializes the proxy with optionally a given set of topics.
17 
18  @type topics: dictionary string - message_class
19  @param topics: A dictionary containing a collection of topic - message type pairs.
20  """
21  for topic, msg_type in topics.items():
22  self.subscribe(topic, msg_type)
23 
24  def subscribe(self, topic, msg_type, callback=None, buffered=False):
25  """
26  Adds a new subscriber to the proxy.
27 
28  @type topic: string
29  @param topic: The topic to subscribe.
30 
31  @type msg_type: a message class
32  @param msg_type: The type of messages of this topic.
33 
34  @type callback: function
35  @param callback: A function to be called when receiving messages.
36 
37  @type buffered: boolean
38  @param buffered: True if all messages should be bufferd, False if only the last message should be cached.
39  """
40  if topic not in ProxySubscriberCached._topics:
41  sub = rospy.Subscriber(topic, msg_type, self._callback, callback_args=topic)
42  ProxySubscriberCached._topics[topic] = {'subscriber': sub,
43  'last_msg': None,
44  'buffered': buffered,
45  'msg_queue': []}
46  if callback is not None:
47  ProxySubscriberCached._topics[topic]['subscriber'].impl.add_callback(callback, None)
48 
49  def _callback(self, msg, topic):
50  """
51  Standard callback that is executed when a message is received.
52 
53  @type topic: message
54  @param topic: The latest message received on this topic.
55 
56  @type topic: string
57  @param topic: The topic to which this callback belongs.
58  """
59  if topic not in ProxySubscriberCached._topics:
60  return
61  ProxySubscriberCached._topics[topic]['last_msg'] = msg
62  if ProxySubscriberCached._topics[topic]['buffered']:
63  ProxySubscriberCached._topics[topic]['msg_queue'].append(msg)
64 
65  def set_callback(self, topic, callback):
66  """
67  Adds the given callback to the topic subscriber.
68 
69  @type topic: string
70  @param topic: The topic to add the callback to.
71 
72  @type callback: function
73  @param callback: The callback to be added.
74  """
75  ProxySubscriberCached._topics[topic]['subscriber'].impl.add_callback(callback, None)
76 
77  def enable_buffer(self, topic):
78  """
79  Enables the buffer on the given topic.
80 
81  @type topic: string
82  @param topic: The topic of interest.
83  """
84  ProxySubscriberCached._topics[topic]['buffered'] = True
85 
86  def disable_buffer(self, topic):
87  """
88  Disables the buffer on the given topic.
89 
90  @type topic: string
91  @param topic: The topic of interest.
92  """
93  ProxySubscriberCached._topics[topic]['buffered'] = False
94  ProxySubscriberCached._topics[topic]['msg_queue'] = []
95 
96  def is_available(self, topic):
97  """
98  Checks if the subscriber on the given topic is available.
99 
100  @type topic: string
101  @param topic: The topic of interest.
102  """
103  return topic in ProxySubscriberCached._topics
104 
105  def get_last_msg(self, topic):
106  """
107  Returns the latest cached message of the given topic.
108 
109  @type topic: string
110  @param topic: The topic of interest.
111  """
112  return ProxySubscriberCached._topics[topic]['last_msg']
113 
114  def get_from_buffer(self, topic):
115  """
116  Pops the oldest buffered message of the given topic.
117 
118  @type topic: string
119  @param topic: The topic of interest.
120  """
121  if not ProxySubscriberCached._topics[topic]['buffered']:
122  Logger.logwarn('Attempted to access buffer of non-buffered topic!')
123  return None
124  if len(ProxySubscriberCached._topics[topic]['msg_queue']) == 0:
125  return None
126  msg = ProxySubscriberCached._topics[topic]['msg_queue'][0]
127  ProxySubscriberCached._topics[topic]['msg_queue'] = ProxySubscriberCached._topics[topic]['msg_queue'][1:]
128  return msg
129 
130  def has_msg(self, topic):
131  """
132  Determines if the given topic has a message in its cache.
133 
134  @type topic: string
135  @param topic: The topic of interest.
136  """
137  return ProxySubscriberCached._topics[topic]['last_msg'] is not None
138 
139  def has_buffered(self, topic):
140  """
141  Determines if the given topic has any messages in its buffer.
142 
143  @type topic: string
144  @param topic: The topic of interest.
145  """
146  return len(ProxySubscriberCached._topics[topic]['msg_queue']) > 0
147 
148  def remove_last_msg(self, topic, clear_buffer=False):
149  """
150  Removes the cached message of the given topic and optionally clears its buffer.
151 
152  @type topic: string
153  @param topic: The topic of interest.
154 
155  @type topic: boolean
156  @param topic: Set to true if the buffer of the given topic should be cleared as well.
157  """
158  if topic in ProxySubscriberCached._persistant_topics:
159  return
160  ProxySubscriberCached._topics[topic]['last_msg'] = None
161  if clear_buffer:
162  ProxySubscriberCached._topics[topic]['msg_queue'] = []
163 
164  def make_persistant(self, topic):
165  """
166  Makes the given topic persistant which means messages can no longer be removed
167  (remove_last_msg will have no effect), only overwritten by a new message.
168 
169  @type topic: string
170  @param topic: The topic of interest.
171  """
172  if topic not in ProxySubscriberCached._persistant_topics:
173  ProxySubscriberCached._persistant_topics.append(topic)
174 
175  def has_topic(self, topic):
176  """
177  Determines if the given topic is already subscribed.
178 
179  @type topic: string
180  @param topic: The topic of interest.
181  """
182  Logger.logwarn('Deprecated (ProxySubscriberCached): use "is_available(topic)" instead of "has_topic(topic)".')
183  return self.is_available(topic)
184 
185  def unsubscribe_topic(self, topic):
186  """
187  Removes the given topic from the list of subscribed topics.
188 
189  @type topic: string
190  @param topic: The topic of interest.
191  """
192  if topic in ProxySubscriberCached._topics:
193  ProxySubscriberCached._topics[topic]['subscriber'].unregister()
194  ProxySubscriberCached._topics.pop(topic)
195 
196  def shutdown(self):
197  """ Shuts this proxy down by unregistering all subscribers. """
198  try:
199  for topic in ProxySubscriberCached._topics:
200  try:
201  ProxySubscriberCached._topics[topic]['subscriber'].unregister()
202  except Exception as e:
203  rospy.logerr('Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
204  except Exception as e:
205  rospy.logerr('Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
206  ProxySubscriberCached._topics.clear()
def subscribe(self, topic, msg_type, callback=None, buffered=False)


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:39