Go to the documentation of this file.00001
00002
00003 import roslib; roslib.load_manifest('flexbe_core')
00004 import rospy
00005
00006 import time
00007 import random
00008
00009
00010 class ProxySubscriberCached(object):
00011 """
00012 A proxy for subscribing topics that caches and buffers received messages.
00013 """
00014 _simulate_delay = False
00015
00016 _topics = {}
00017 _persistant_topics = []
00018
00019 def __init__(self, topics={}):
00020 """
00021 Initializes the proxy with optionally a given set of topics.
00022
00023 @type topics: dictionary string - message_class
00024 @param topics: A dictionary containing a collection of topic - message type pairs.
00025 """
00026 for topic, msg_type in topics.iteritems():
00027 self.subscribe(topic, msg_type)
00028
00029
00030 def subscribe(self, topic, msg_type, callback=None, buffered=False):
00031 """
00032 Adds a new subscriber to the proxy.
00033
00034 @type topic: string
00035 @param topic: The topic to subscribe.
00036
00037 @type msg_type: a message class
00038 @param msg_type: The type of messages of this topic.
00039
00040 @type callback: function
00041 @param callback: A function to be called when receiving messages.
00042
00043 @type buffered: boolean
00044 @param buffered: True if all messages should be bufferd, False if only the last message should be cached.
00045 """
00046 if not topic in ProxySubscriberCached._topics:
00047 sub = rospy.Subscriber(topic, msg_type, self._callback, callback_args=topic)
00048 ProxySubscriberCached._topics[topic] = {'subscriber': sub,
00049 'last_msg': None,
00050 'buffered': buffered,
00051 'msg_queue':[]}
00052
00053 if callback is not None:
00054 ProxySubscriberCached._topics[topic]['subscriber'].impl.add_callback(callback, None)
00055
00056
00057 def _callback(self, msg, topic):
00058 """
00059 Standard callback that is executed when a message is received.
00060
00061 @type topic: message
00062 @param topic: The latest message received on this topic.
00063
00064 @type topic: string
00065 @param topic: The topic to which this callback belongs.
00066 """
00067 if ProxySubscriberCached._simulate_delay:
00068 time.sleep(max(0, random.gauss(2, 0.6)))
00069
00070 if not ProxySubscriberCached._topics.has_key(topic):
00071 return
00072 ProxySubscriberCached._topics[topic]['last_msg'] = msg
00073 if ProxySubscriberCached._topics[topic]['buffered']:
00074 ProxySubscriberCached._topics[topic]['msg_queue'].append(msg)
00075
00076
00077 def set_callback(self, topic, callback):
00078 """
00079 Adds the given callback to the topic subscriber.
00080
00081 @type topic: string
00082 @param topic: The topic to add the callback to.
00083
00084 @type callback: function
00085 @param callback: The callback to be added.
00086 """
00087 ProxySubscriberCached._topics[topic]['subscriber'].impl.add_callback(callback, None)
00088
00089
00090 def enable_buffer(self, topic):
00091 """
00092 Enables the buffer on the given topic.
00093
00094 @type topic: string
00095 @param topic: The topic of interest.
00096 """
00097 ProxySubscriberCached._topics[topic]['buffered'] = True
00098
00099
00100 def disable_buffer(self, topic):
00101 """
00102 Disables the buffer on the given topic.
00103
00104 @type topic: string
00105 @param topic: The topic of interest.
00106 """
00107 ProxySubscriberCached._topics[topic]['buffered'] = False
00108 ProxySubscriberCached._topics[topic]['msg_queue'] = []
00109
00110
00111 def is_available(self, topic):
00112 """
00113 Checks if the subscriber on the given topic is available.
00114
00115 @type topic: string
00116 @param topic: The topic of interest.
00117 """
00118 return topic in ProxySubscriberCached._topics
00119
00120
00121 def get_last_msg(self, topic):
00122 """
00123 Returns the latest cached message of the given topic.
00124
00125 @type topic: string
00126 @param topic: The topic of interest.
00127 """
00128 return ProxySubscriberCached._topics[topic]['last_msg']
00129
00130
00131 def get_from_buffer(self, topic):
00132 """
00133 Pops the oldest buffered message of the given topic.
00134
00135 @type topic: string
00136 @param topic: The topic of interest.
00137 """
00138 if not ProxySubscriberCached._topics[topic]['buffered']:
00139 rospy.logwarn('Attempted to access buffer of non-buffered topic!')
00140 return None
00141 msg = ProxySubscriberCached._topics[topic]['msg_queue'][0]
00142 ProxySubscriberCached._topics[topic]['msg_queue'] = ProxySubscriberCached._topics[topic]['msg_queue'][1:]
00143 return msg
00144
00145
00146 def has_msg(self, topic):
00147 """
00148 Determines if the given topic has a message in its cache.
00149
00150 @type topic: string
00151 @param topic: The topic of interest.
00152 """
00153 return ProxySubscriberCached._topics[topic]['last_msg'] is not None
00154
00155
00156 def has_buffered(self, topic):
00157 """
00158 Determines if the given topic has any messages in its buffer.
00159
00160 @type topic: string
00161 @param topic: The topic of interest.
00162 """
00163 return len(ProxySubscriberCached._topics[topic]['msg_queue']) > 0
00164
00165
00166 def remove_last_msg(self, topic, clear_buffer=False):
00167 """
00168 Removes the cached message of the given topic and optionally clears its buffer.
00169
00170 @type topic: string
00171 @param topic: The topic of interest.
00172
00173 @type topic: boolean
00174 @param topic: Set to true if the buffer of the given topic should be cleared as well.
00175 """
00176 if ProxySubscriberCached._persistant_topics.count(topic) > 0: return
00177 ProxySubscriberCached._topics[topic]['last_msg'] = None
00178 if clear_buffer:
00179 ProxySubscriberCached._topics[topic]['msg_queue'] = []
00180
00181
00182 def make_persistant(self, topic):
00183 """
00184 Makes the given topic persistant which means messages can no longer be removed
00185 (remove_last_msg will have no effect), only overwritten by a new message.
00186
00187 @type topic: string
00188 @param topic: The topic of interest.
00189 """
00190 ProxySubscriberCached._persistant_topics.append(topic)
00191
00192
00193 def has_topic(self, topic):
00194 """
00195 Determines if the given topic is already subscribed.
00196
00197 @type topic: string
00198 @param topic: The topic of interest.
00199 """
00200 if topic in ProxySubscriberCached._topics:
00201 return True
00202 return False
00203
00204
00205 def unsubscribe_topic(self, topic):
00206 """
00207 Removes the given topic from the list of subscribed topics.
00208
00209 @type topic: string
00210 @param topic: The topic of interest.
00211 """
00212 if topic in ProxySubscriberCached._topics:
00213 ProxySubscriberCached._topics[topic]['subscriber'].unregister()
00214 ProxySubscriberCached._topics.pop(topic)
00215
00216
00217 def shutdown(self):
00218 """ Shuts this proxy down by unregistering all subscribers. """
00219 try:
00220 for topic in ProxySubscriberCached._topics:
00221 try:
00222 ProxySubscriberCached._topics[topic]['subscriber'].unregister()
00223 except Exception as e:
00224 rospy.logerr('Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
00225 except Exception as e:
00226 rospy.logerr('Something went wrong during shutdown of proxy subscriber!\n%s', str(e))
00227 ProxySubscriberCached._topics.clear()