proxy_subscriber_cached.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import roslib; roslib.load_manifest('flexbe_core')
4 import rospy
5 
6 import time
7 import random
8 
9 
10 class ProxySubscriberCached(object):
11  """
12  A proxy for subscribing topics that caches and buffers received messages.
13  """
14  _simulate_delay = False
15 
16  _topics = {}
17  _persistant_topics = []
18 
19  def __init__(self, topics={}):
20  """
21  Initializes the proxy with optionally a given set of topics.
22 
23  @type topics: dictionary string - message_class
24  @param topics: A dictionary containing a collection of topic - message type pairs.
25  """
26  for topic, msg_type in topics.iteritems():
27  self.subscribe(topic, msg_type)
28 
29 
30  def subscribe(self, topic, msg_type, callback=None, buffered=False):
31  """
32  Adds a new subscriber to the proxy.
33 
34  @type topic: string
35  @param topic: The topic to subscribe.
36 
37  @type msg_type: a message class
38  @param msg_type: The type of messages of this topic.
39 
40  @type callback: function
41  @param callback: A function to be called when receiving messages.
42 
43  @type buffered: boolean
44  @param buffered: True if all messages should be bufferd, False if only the last message should be cached.
45  """
46  if not topic in ProxySubscriberCached._topics:
47  sub = rospy.Subscriber(topic, msg_type, self._callback, callback_args=topic)
48  ProxySubscriberCached._topics[topic] = {'subscriber': sub,
49  'last_msg': None,
50  'buffered': buffered,
51  'msg_queue':[]}
52 
53  if callback is not None:
54  ProxySubscriberCached._topics[topic]['subscriber'].impl.add_callback(callback, None)
55 
56 
57  def _callback(self, msg, topic):
58  """
59  Standard callback that is executed when a message is received.
60 
61  @type topic: message
62  @param topic: The latest message received on this topic.
63 
64  @type topic: string
65  @param topic: The topic to which this callback belongs.
66  """
67  if ProxySubscriberCached._simulate_delay:
68  time.sleep(max(0, random.gauss(2, 0.6))) # for simulating comms_bridge delay
69 
70  if not ProxySubscriberCached._topics.has_key(topic):
71  return
72  ProxySubscriberCached._topics[topic]['last_msg'] = msg
73  if ProxySubscriberCached._topics[topic]['buffered']:
74  ProxySubscriberCached._topics[topic]['msg_queue'].append(msg)
75 
76 
77  def set_callback(self, topic, callback):
78  """
79  Adds the given callback to the topic subscriber.
80 
81  @type topic: string
82  @param topic: The topic to add the callback to.
83 
84  @type callback: function
85  @param callback: The callback to be added.
86  """
87  ProxySubscriberCached._topics[topic]['subscriber'].impl.add_callback(callback, None)
88 
89 
90  def enable_buffer(self, topic):
91  """
92  Enables the buffer on the given topic.
93 
94  @type topic: string
95  @param topic: The topic of interest.
96  """
97  ProxySubscriberCached._topics[topic]['buffered'] = True
98 
99 
100  def disable_buffer(self, topic):
101  """
102  Disables the buffer on the given topic.
103 
104  @type topic: string
105  @param topic: The topic of interest.
106  """
107  ProxySubscriberCached._topics[topic]['buffered'] = False
108  ProxySubscriberCached._topics[topic]['msg_queue'] = []
109 
110 
111  def is_available(self, topic):
112  """
113  Checks if the subscriber on the given topic is available.
114 
115  @type topic: string
116  @param topic: The topic of interest.
117  """
118  return topic in ProxySubscriberCached._topics
119 
120 
121  def get_last_msg(self, topic):
122  """
123  Returns the latest cached message of the given topic.
124 
125  @type topic: string
126  @param topic: The topic of interest.
127  """
128  return ProxySubscriberCached._topics[topic]['last_msg']
129 
130 
131  def get_from_buffer(self, topic):
132  """
133  Pops the oldest buffered message of the given topic.
134 
135  @type topic: string
136  @param topic: The topic of interest.
137  """
138  if not ProxySubscriberCached._topics[topic]['buffered']:
139  rospy.logwarn('Attempted to access buffer of non-buffered topic!')
140  return None
141  msg = ProxySubscriberCached._topics[topic]['msg_queue'][0]
142  ProxySubscriberCached._topics[topic]['msg_queue'] = ProxySubscriberCached._topics[topic]['msg_queue'][1:]
143  return msg
144 
145 
146  def has_msg(self, topic):
147  """
148  Determines if the given topic has a message in its cache.
149 
150  @type topic: string
151  @param topic: The topic of interest.
152  """
153  return ProxySubscriberCached._topics[topic]['last_msg'] is not None
154 
155 
156  def has_buffered(self, topic):
157  """
158  Determines if the given topic has any messages in its buffer.
159 
160  @type topic: string
161  @param topic: The topic of interest.
162  """
163  return len(ProxySubscriberCached._topics[topic]['msg_queue']) > 0
164 
165 
166  def remove_last_msg(self, topic, clear_buffer=False): # better pop last msg
167  """
168  Removes the cached message of the given topic and optionally clears its buffer.
169 
170  @type topic: string
171  @param topic: The topic of interest.
172 
173  @type topic: boolean
174  @param topic: Set to true if the buffer of the given topic should be cleared as well.
175  """
176  if ProxySubscriberCached._persistant_topics.count(topic) > 0: return
177  ProxySubscriberCached._topics[topic]['last_msg'] = None
178  if clear_buffer:
179  ProxySubscriberCached._topics[topic]['msg_queue'] = []
180 
181 
182  def make_persistant(self, topic):
183  """
184  Makes the given topic persistant which means messages can no longer be removed
185  (remove_last_msg will have no effect), only overwritten by a new message.
186 
187  @type topic: string
188  @param topic: The topic of interest.
189  """
190  ProxySubscriberCached._persistant_topics.append(topic)
191 
192 
193  def has_topic(self, topic):
194  """
195  Determines if the given topic is already subscribed.
196 
197  @type topic: string
198  @param topic: The topic of interest.
199  """
200  if topic in ProxySubscriberCached._topics:
201  return True
202  return False
203 
204 
205  def unsubscribe_topic(self, topic):
206  """
207  Removes the given topic from the list of subscribed topics.
208 
209  @type topic: string
210  @param topic: The topic of interest.
211  """
212  if topic in ProxySubscriberCached._topics:
213  ProxySubscriberCached._topics[topic]['subscriber'].unregister()
214  ProxySubscriberCached._topics.pop(topic)
215 
216 
217  def shutdown(self):
218  """ Shuts this proxy down by unregistering all subscribers. """
219  try:
220  for topic in ProxySubscriberCached._topics:
221  try:
222  ProxySubscriberCached._topics[topic]['subscriber'].unregister()
223  except Exception as e:
224  rospy.logerr('Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
225  except Exception as e:
226  rospy.logerr('Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
227  ProxySubscriberCached._topics.clear()
def subscribe(self, topic, msg_type, callback=None, buffered=False)


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Wed Jun 5 2019 21:51:59