Message Filters — chained message processing¶
message_filters
is a collection of message “filters” which take messages in,
either from a ROS subscription or another filter,
and may or may not output the message
at some time in the future, depending on a policy defined for that filter.
message_filters also defines a common interface for these filters, allowing you to chain them together.
The filters currently implemented in this package are:
message_filters.Subscriber
- A source filter, which wraps a ROS subscription. Most filter chains will begin with a Subscriber.message_filters.Cache
- Caches messages which pass through it, allowing later lookup by time stamp.message_filters.TimeSynchronizer
- Synchronizes multiple messages by their timestamps, only passing them through when all have arrived.message_filters.TimeSequencer
- Tries to pass messages through ordered by their timestamps, even if some arrive out of order.
Here’s a simple example of using a Subscriber with a Cache:
def myCallback(posemsg):
print posemsg
sub = message_filters.Subscriber("pose_topic", robot_msgs.msg.Pose)
cache = message_filters.Cache(sub, 10)
cache.registerCallback(myCallback)
The Subscriber here acts as the source of messages. Each message is passed to the cache, which then passes it through to the
user’s callback myCallback
.
Using the time synchronizer:
from message_filters import TimeSynchronizer, Subscriber
def gotimage(image, camerainfo):
assert image.header.stamp == camerainfo.header.stamp
print "got an Image and CameraInfo"
tss = TimeSynchronizer(Subscriber("/wide_stereo/left/image_rect_color", sensor_msgs.msg.Image),
Subscriber("/wide_stereo/left/camera_info", sensor_msgs.msg.CameraInfo))
tss.registerCallback(gotimage)
The message filter interface¶
For an object to be usable as a message filter, it needs to have one method,
registerCallback
. To collect messages from a message filter, register a callback with:
anyfilter.registerCallback(my_callback)
The signature of my_callback
varies according to the message filter. For many filters it is simply:
def my_callback(msg):
where msg
is the message.
Message filters that accept input from an upstream
message filter (e.g. message_filters.Cache
) register their own
message handler as a callback.
Output connections are registered through the registerCallback()
function.
Message Filter Objects¶
-
class
message_filters.
ApproximateTimeSynchronizer
(fs, queue_size, slop, allow_headerless=False, reset=False)¶ Approximately synchronizes messages by their timestamps.
ApproximateTimeSynchronizer
synchronizes incoming message filters by the timestamps contained in their messages’ headers. The API is the same as TimeSynchronizer except for an extra slop parameter in the constructor that defines the delay (in seconds) with which messages can be synchronized. Theallow_headerless
option specifies whether to allow storing headerless messages with current ROS time instead of timestamp. You should avoid this as much as you can, since the delays are unpredictable.-
registerCallback
(cb, *args)¶ Register a callback function cb to be called when this filter has output. The filter calls the function
cb
with a filter-dependent list of arguments, followed by the call-supplied argumentsargs
.
-
-
class
message_filters.
Cache
(f, cache_size=1, allow_headerless=False)¶ Stores a time history of messages.
Given a stream of messages, the most recent
cache_size
messages are cached in a ring buffer, from which time intervals of the cache can then be retrieved by the client. Theallow_headerless
option specifies whether to allow storing headerless messages with current ROS time instead of timestamp. You should avoid this as much as you can, since the delays are unpredictable.-
getElemAfterTime
(stamp)¶ Return the oldest element after or equal the passed time stamp.
-
getElemBeforeTime
(stamp)¶ Return the newest element before or equal the passed time stamp.
-
getInterval
(from_stamp, to_stamp)¶ Query the current cache content between from_stamp to to_stamp.
-
getLastestTime
()¶ Return the newest recorded timestamp (equivalent to getLatestTime(), but included for backwards compatibility).
-
getLatestTime
()¶ Return the newest recorded timestamp.
-
getOldestTime
()¶ Return the oldest recorded timestamp.
-
registerCallback
(cb, *args)¶ Register a callback function cb to be called when this filter has output. The filter calls the function
cb
with a filter-dependent list of arguments, followed by the call-supplied argumentsargs
.
-
-
class
message_filters.
Subscriber
(*args, **kwargs)¶ ROS subscription filter. Identical arguments as
rospy.Subscriber
.This class acts as a highest-level filter, simply passing messages from a ROS subscription through to the filters which have connected to it.
-
registerCallback
(cb, *args)¶ Register a callback function cb to be called when this filter has output. The filter calls the function
cb
with a filter-dependent list of arguments, followed by the call-supplied argumentsargs
.
-
-
class
message_filters.
TimeSynchronizer
(fs, queue_size, reset=False)¶ Synchronizes messages by their timestamps.
TimeSynchronizer
synchronizes incoming message filters by the timestamps contained in their messages’ headers. TimeSynchronizer listens on multiple input message filtersfs
, and invokes the callback when it has a collection of messages with matching timestamps.The signature of the callback function is:
def callback(msg1, ... msgN):
where N is the number of input message filters, and each message is the output of the corresponding filter in
fs
. The requiredqueue size
parameter specifies how many sets of messages it should store from each input filter (by timestamp) while waiting for messages to arrive and complete their “set”.-
registerCallback
(cb, *args)¶ Register a callback function cb to be called when this filter has output. The filter calls the function
cb
with a filter-dependent list of arguments, followed by the call-supplied argumentsargs
.
-