Go to the documentation of this file.00001 import threading
00002 import rospy
00003 from message_filters import SimpleFilter
00004
00005 class SimpleApproximateTimeSynchronizer(SimpleFilter):
00006 """
00007 Synchronize multiple topics using a simple
00008 and non-optimal strategy.
00009
00010 This caches messages for all subscribed filters
00011 and passes them on (bundled) whenever there are
00012 messages available on all topics.
00013 """
00014
00015 def __init__(self, filters, queue_size=10, max_difference=rospy.Duration(.5)):
00016 """
00017 :param queue_size: The size of the message buffer for each filter
00018 :param max_difference: The maximal time difference between messages in the same bundle
00019 """
00020
00021 SimpleFilter.__init__(self)
00022 self.connectInput(filters)
00023
00024 self.queue_size= queue_size
00025 self.max_diff = max_difference
00026 self.lock = threading.Lock()
00027
00028 def connectInput(self, filters):
00029
00030 self.queues = [[] for f in filters]
00031 self.input_connections = [f.registerCallback(self.add, q) for (f, q) in zip(filters, self.queues)]
00032
00033 def add(self, msg, queue):
00034
00035 self.lock.acquire()
00036 queue.append(msg)
00037
00038 while len(queue) > self.queue_size:
00039 queue.pop(0)
00040
00041 if len(queue) == 1 and not([] in self.queues):
00042
00043
00044 t= queue[0].header.stamp
00045
00046 def mindiff(queue, time):
00047 """ return the message closest to time """
00048 selmsg=queue[0]
00049 for msg in queue[1:]:
00050 if abs(msg.header.stamp - time) < abs(selmsg.header.stamp - time):
00051 selmsg= msg
00052 elif abs(msg.header.stamp - time) > abs(selmsg.header.stamp - time):
00053 break
00054 return selmsg
00055
00056
00057
00058 msgs= [mindiff(q, t) for q in self.queues]
00059 times= [m.header.stamp for m in msgs]
00060
00061 bundle_size= max(times)-min(times)
00062 if bundle_size < self.max_diff:
00063 self.signalMessage(*msgs)
00064 else:
00065 rospy.loginfo("discard bundle because overall time difference %s > %s" % (str(bundle_size), str(self.max_diff)))
00066
00067
00068 for (q,t) in zip(self.queues, times):
00069 while len(q) > 0 and q[0].header.stamp <= t:
00070 q.pop(0)
00071
00072 self.lock.release()