proxy_subscriber_cached.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 
4 from flexbe_core.logger import Logger
5 
6 
7 class ProxySubscriberCached(object):
8  """
9  A proxy for subscribing topics that caches and buffers received messages.
10  """
11  _topics = {}
12  _persistant_topics = []
13 
14  def __init__(self, topics={}):
15  """
16  Initializes the proxy with optionally a given set of topics.
17 
18  @type topics: dictionary string - message_class
19  @param topics: A dictionary containing a collection of topic - message type pairs.
20  """
21  for topic, msg_type in topics.items():
22  self.subscribe(topic, msg_type)
23 
24  def subscribe(self, topic, msg_type, callback=None, buffered=False):
25  """
26  Adds a new subscriber to the proxy.
27 
28  @type topic: string
29  @param topic: The topic to subscribe.
30 
31  @type msg_type: a message class
32  @param msg_type: The type of messages of this topic.
33 
34  @type callback: function
35  @param callback: A function to be called when receiving messages.
36 
37  @type buffered: boolean
38  @param buffered: True if all messages should be bufferd, False if only the last message should be cached.
39  """
40  if topic not in ProxySubscriberCached._topics:
41  sub = rospy.Subscriber(topic, msg_type, self._callback, callback_args=topic)
42  ProxySubscriberCached._topics[topic] = {'subscriber': sub,
43  'last_msg': None,
44  'buffered': buffered,
45  'msg_queue': []}
46  if callback is not None:
47  ProxySubscriberCached._topics[topic]['subscriber'].impl.add_callback(callback, None)
48 
49  def _callback(self, msg, topic):
50  """
51  Standard callback that is executed when a message is received.
52 
53  @type topic: message
54  @param topic: The latest message received on this topic.
55 
56  @type topic: string
57  @param topic: The topic to which this callback belongs.
58  """
59  if topic not in ProxySubscriberCached._topics:
60  return
61  ProxySubscriberCached._topics[topic]['last_msg'] = msg
62  if ProxySubscriberCached._topics[topic]['buffered']:
63  ProxySubscriberCached._topics[topic]['msg_queue'].append(msg)
64 
65  def set_callback(self, topic, callback):
66  """
67  Adds the given callback to the topic subscriber.
68 
69  @type topic: string
70  @param topic: The topic to add the callback to.
71 
72  @type callback: function
73  @param callback: The callback to be added.
74  """
75  ProxySubscriberCached._topics[topic]['subscriber'].impl.add_callback(callback, None)
76 
77  def enable_buffer(self, topic):
78  """
79  Enables the buffer on the given topic.
80 
81  @type topic: string
82  @param topic: The topic of interest.
83  """
84  ProxySubscriberCached._topics[topic]['buffered'] = True
85 
86  def disable_buffer(self, topic):
87  """
88  Disables the buffer on the given topic.
89 
90  @type topic: string
91  @param topic: The topic of interest.
92  """
93  ProxySubscriberCached._topics[topic]['buffered'] = False
94  ProxySubscriberCached._topics[topic]['msg_queue'] = []
95 
96  def is_available(self, topic):
97  """
98  Checks if the subscriber on the given topic is available.
99 
100  @type topic: string
101  @param topic: The topic of interest.
102  """
103  return topic in ProxySubscriberCached._topics
104 
105  def get_last_msg(self, topic):
106  """
107  Returns the latest cached message of the given topic.
108 
109  @type topic: string
110  @param topic: The topic of interest.
111  """
112  return ProxySubscriberCached._topics[topic]['last_msg']
113 
114  def get_from_buffer(self, topic):
115  """
116  Pops the oldest buffered message of the given topic.
117 
118  @type topic: string
119  @param topic: The topic of interest.
120  """
121  if not ProxySubscriberCached._topics[topic]['buffered']:
122  Logger.logwarn('Attempted to access buffer of non-buffered topic!')
123  return None
124  if len(ProxySubscriberCached._topics[topic]['msg_queue']) == 0:
125  return None
126  msg = ProxySubscriberCached._topics[topic]['msg_queue'][0]
127  ProxySubscriberCached._topics[topic]['msg_queue'] = ProxySubscriberCached._topics[topic]['msg_queue'][1:]
128  return msg
129 
130  def has_msg(self, topic):
131  """
132  Determines if the given topic has a message in its cache.
133 
134  @type topic: string
135  @param topic: The topic of interest.
136  """
137  if self.is_available(topic):
138  return ProxySubscriberCached._topics[topic]['last_msg'] is not None
139  return False
140 
141  def has_buffered(self, topic):
142  """
143  Determines if the given topic has any messages in its buffer.
144 
145  @type topic: string
146  @param topic: The topic of interest.
147  """
148  return len(ProxySubscriberCached._topics[topic]['msg_queue']) > 0
149 
150  def remove_last_msg(self, topic, clear_buffer=False):
151  """
152  Removes the cached message of the given topic and optionally clears its buffer.
153 
154  @type topic: string
155  @param topic: The topic of interest.
156 
157  @type topic: boolean
158  @param topic: Set to true if the buffer of the given topic should be cleared as well.
159  """
160  if topic in ProxySubscriberCached._persistant_topics:
161  return
162  ProxySubscriberCached._topics[topic]['last_msg'] = None
163  if clear_buffer:
164  ProxySubscriberCached._topics[topic]['msg_queue'] = []
165 
166  def make_persistant(self, topic):
167  """
168  Makes the given topic persistant which means messages can no longer be removed
169  (remove_last_msg will have no effect), only overwritten by a new message.
170 
171  @type topic: string
172  @param topic: The topic of interest.
173  """
174  if topic not in ProxySubscriberCached._persistant_topics:
175  ProxySubscriberCached._persistant_topics.append(topic)
176 
177  def has_topic(self, topic):
178  """
179  Determines if the given topic is already subscribed.
180 
181  @type topic: string
182  @param topic: The topic of interest.
183  """
184  Logger.logwarn('Deprecated (ProxySubscriberCached): use "is_available(topic)" instead of "has_topic(topic)".')
185  return self.is_available(topic)
186 
187  def unsubscribe_topic(self, topic):
188  """
189  Removes the given topic from the list of subscribed topics.
190 
191  @type topic: string
192  @param topic: The topic of interest.
193  """
194  if topic in ProxySubscriberCached._topics:
195  ProxySubscriberCached._topics[topic]['subscriber'].unregister()
196  ProxySubscriberCached._topics.pop(topic)
197 
198  def shutdown(self):
199  """ Shuts this proxy down by unregistering all subscribers. """
200  try:
201  for topic in ProxySubscriberCached._topics:
202  try:
203  ProxySubscriberCached._topics[topic]['subscriber'].unregister()
204  except Exception as e:
205  rospy.logerr('Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
206  except Exception as e:
207  rospy.logerr('Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
208  ProxySubscriberCached._topics.clear()
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.enable_buffer
def enable_buffer(self, topic)
Definition: proxy_subscriber_cached.py:77
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.make_persistant
def make_persistant(self, topic)
Definition: proxy_subscriber_cached.py:166
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.get_from_buffer
def get_from_buffer(self, topic)
Definition: proxy_subscriber_cached.py:114
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.is_available
def is_available(self, topic)
Definition: proxy_subscriber_cached.py:96
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.__init__
def __init__(self, topics={})
Definition: proxy_subscriber_cached.py:14
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached
Definition: proxy_subscriber_cached.py:7
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.shutdown
def shutdown(self)
Definition: proxy_subscriber_cached.py:198
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.has_buffered
def has_buffered(self, topic)
Definition: proxy_subscriber_cached.py:141
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.remove_last_msg
def remove_last_msg(self, topic, clear_buffer=False)
Definition: proxy_subscriber_cached.py:150
flexbe_core.logger
Definition: logger.py:1
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached._callback
def _callback(self, msg, topic)
Definition: proxy_subscriber_cached.py:49
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.get_last_msg
def get_last_msg(self, topic)
Definition: proxy_subscriber_cached.py:105
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.unsubscribe_topic
def unsubscribe_topic(self, topic)
Definition: proxy_subscriber_cached.py:187
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.disable_buffer
def disable_buffer(self, topic)
Definition: proxy_subscriber_cached.py:86
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.subscribe
def subscribe(self, topic, msg_type, callback=None, buffered=False)
Definition: proxy_subscriber_cached.py:24
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.has_msg
def has_msg(self, topic)
Definition: proxy_subscriber_cached.py:130
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.set_callback
def set_callback(self, topic, callback)
Definition: proxy_subscriber_cached.py:65
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached.has_topic
def has_topic(self, topic)
Definition: proxy_subscriber_cached.py:177


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Fri Jul 21 2023 02:26:05