Program Listing for File message_filter.h
↰ Return to documentation for file (include/tf2_ros/message_filter.h
)
/*
* Copyright (c) 2010, Willow Garage, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the Willow Garage, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TF2_ROS__MESSAGE_FILTER_H_
#define TF2_ROS__MESSAGE_FILTER_H_
#include <algorithm>
#include <chrono>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <ratio>
#include <sstream>
#include <string>
#include <tuple>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
#include "message_filters/connection.h"
#include "message_filters/message_traits.h"
#include "message_filters/simple_filter.h"
#include "tf2/buffer_core_interface.h"
#include "tf2/time.h"
#include "tf2_ros/async_buffer_interface.h"
#include "tf2_ros/buffer.h"
#include "builtin_interfaces/msg/time.hpp"
#include "rclcpp/rclcpp.hpp"
#define TF2_ROS_MESSAGEFILTER_DEBUG(fmt, ...) \
RCUTILS_LOG_DEBUG_NAMED( \
"tf2_ros_message_filter", \
std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), \
getTargetFramesString().c_str(), __VA_ARGS__)
#define TF2_ROS_MESSAGEFILTER_WARN(fmt, ...) \
RCUTILS_LOG_WARN_NAMED( \
"tf2_ros_message_filter", \
std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), \
getTargetFramesString().c_str(), __VA_ARGS__)
namespace tf2_ros
{
namespace filter_failure_reasons
{
enum FilterFailureReason
{
// NOTE when adding new values, do not explicitly assign a number. See FilterFailureReasonCount
// reason it was unable to be transformed is unknown.
Unknown,
OutTheBack,
EmptyFrameID,
NoTransformFound,
QueueFull,
FilterFailureReasonCount,
};
} // namespace filter_failure_reasons
static std::string get_filter_failure_reason_string(
filter_failure_reasons::FilterFailureReason reason)
{
switch (reason) {
case filter_failure_reasons::OutTheBack:
return
"the timestamp on the message is earlier than all the data in the transform cache";
case filter_failure_reasons::EmptyFrameID:
return "the frame id of the message is empty";
case filter_failure_reasons::NoTransformFound:
return "did not find a valid transform, this usually happens at startup ...";
case filter_failure_reasons::QueueFull:
return "discarding message because the queue is full";
case filter_failure_reasons::Unknown: // fallthrough
default:
return "unknown";
}
}
typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
class MessageFilterBase
{
public:
typedef std::vector<std::string> V_string;
virtual ~MessageFilterBase() {}
virtual void clear() = 0;
virtual void setTargetFrame(const std::string & target_frame) = 0;
virtual void setTargetFrames(const V_string & target_frames) = 0;
virtual void setTolerance(const rclcpp::Duration & tolerance) = 0;
};
template<class M, class BufferT = tf2_ros::Buffer>
class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
{
public:
using MConstPtr = std::shared_ptr<M const>;
typedef message_filters::MessageEvent<M const> MEvent;
template<typename TimeRepT = int64_t, typename TimeT = std::nano>
MessageFilter(
BufferT & buffer, const std::string & target_frame, uint32_t queue_size,
const rclcpp::Node::SharedPtr & node,
std::chrono::duration<TimeRepT, TimeT> buffer_timeout =
std::chrono::duration<TimeRepT, TimeT>::max())
: MessageFilter(
buffer, target_frame, queue_size, node->get_node_logging_interface(),
node->get_node_clock_interface(), buffer_timeout)
{
static_assert(
std::is_base_of<tf2::BufferCoreInterface, BufferT>::value,
"Buffer type must implement tf2::BufferCoreInterface");
static_assert(
std::is_base_of<tf2_ros::AsyncBufferInterface, BufferT>::value,
"Buffer type must implement tf2_ros::AsyncBufferInterface");
}
template<typename TimeRepT = int64_t, typename TimeT = std::nano>
MessageFilter(
BufferT & buffer, const std::string & target_frame, uint32_t queue_size,
const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr & node_logging,
const rclcpp::node_interfaces::NodeClockInterface::SharedPtr & node_clock,
std::chrono::duration<TimeRepT, TimeT> buffer_timeout =
std::chrono::duration<TimeRepT, TimeT>::max())
: node_logging_(node_logging),
node_clock_(node_clock),
buffer_(buffer),
queue_size_(queue_size),
buffer_timeout_(buffer_timeout)
{
init();
setTargetFrame(target_frame);
}
template<class F, typename TimeRepT = int64_t, typename TimeT = std::nano>
MessageFilter(
F & f, BufferT & buffer, const std::string & target_frame, uint32_t queue_size,
const rclcpp::Node::SharedPtr & node,
std::chrono::duration<TimeRepT, TimeT> buffer_timeout =
std::chrono::duration<TimeRepT, TimeT>::max())
: MessageFilter(
f, buffer, target_frame, queue_size, node->get_node_logging_interface(),
node->get_node_clock_interface(), buffer_timeout)
{
}
template<class F, typename TimeRepT = int64_t, typename TimeT = std::nano>
MessageFilter(
F & f, BufferT & buffer, const std::string & target_frame, uint32_t queue_size,
const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr & node_logging,
const rclcpp::node_interfaces::NodeClockInterface::SharedPtr & node_clock,
std::chrono::duration<TimeRepT, TimeT> buffer_timeout =
std::chrono::duration<TimeRepT, TimeT>::max())
: node_logging_(node_logging),
node_clock_(node_clock),
buffer_(buffer),
queue_size_(queue_size),
buffer_timeout_(buffer_timeout)
{
init();
setTargetFrame(target_frame);
connectInput(f);
}
template<class F>
void connectInput(F & f)
{
message_connection_.disconnect();
message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
}
~MessageFilter()
{
message_connection_.disconnect();
clear();
TF2_ROS_MESSAGEFILTER_DEBUG(
"Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, "
"Messages received: %llu, Total dropped: %llu",
static_cast<uint64_t>(successful_transform_count_),
static_cast<uint64_t>(failed_out_the_back_count_),
static_cast<uint64_t>(transform_message_count_),
static_cast<uint64_t>(incoming_message_count_),
static_cast<uint64_t>(dropped_message_count_));
}
void setTargetFrame(const std::string & target_frame)
{
V_string frames;
frames.push_back(target_frame);
setTargetFrames(frames);
}
void setTargetFrames(const V_string & target_frames)
{
std::unique_lock<std::mutex> frames_lock(target_frames_mutex_);
target_frames_.resize(target_frames.size());
std::transform(
target_frames.begin(), target_frames.end(),
target_frames_.begin(), this->stripSlash);
expected_success_count_ = target_frames_.size() * (time_tolerance_.nanoseconds() ? 2 : 1);
std::stringstream ss;
for (V_string::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it) {
ss << *it << " ";
}
target_frames_string_ = ss.str();
}
std::string getTargetFramesString()
{
std::unique_lock<std::mutex> frames_lock(target_frames_mutex_);
return target_frames_string_;
}
void setTolerance(const rclcpp::Duration & tolerance)
{
std::unique_lock<std::mutex> frames_lock(target_frames_mutex_);
time_tolerance_ = tolerance;
expected_success_count_ = target_frames_.size() * (time_tolerance_.nanoseconds() ? 2 : 1);
}
void clear()
{
{
std::unique_lock<std::mutex> lock(ts_futures_mutex_);
for (auto & kv : ts_futures_) {
buffer_.cancel(kv.second);
}
ts_futures_.clear();
}
std::unique_lock<std::mutex> unique_lock(messages_mutex_);
TF2_ROS_MESSAGEFILTER_DEBUG("%s", "Cleared");
messages_.clear();
warned_about_empty_frame_id_ = false;
}
void add(const MEvent & evt)
{
if (target_frames_.empty()) {
return;
}
namespace mt = message_filters::message_traits;
const MConstPtr & message = evt.getMessage();
std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
rclcpp::Time stamp = mt::TimeStamp<M>::value(*message);
if (frame_id.empty()) {
messageDropped(evt, filter_failure_reasons::EmptyFrameID);
return;
}
std::vector<std::tuple<uint64_t, tf2::TimePoint, std::string>> wait_params;
// iterate through the target frames and add requests for each of them
MessageInfo info;
info.handles.reserve(expected_success_count_);
{
V_string target_frames_copy;
// Copy target_frames_ to avoid deadlock from #79
{
std::unique_lock<std::mutex> frames_lock(target_frames_mutex_);
target_frames_copy = target_frames_;
}
V_string::iterator it = target_frames_copy.begin();
V_string::iterator end = target_frames_copy.end();
for (; it != end; ++it) {
const std::string & target_frame = *it;
wait_params.emplace_back(
next_handle_index_, tf2_ros::fromRclcpp(stamp), target_frame);
info.handles.push_back(next_handle_index_++);
if (time_tolerance_.nanoseconds()) {
wait_params.emplace_back(
next_handle_index_,
tf2_ros::fromRclcpp(stamp + time_tolerance_),
target_frame);
info.handles.push_back(next_handle_index_++);
}
}
}
{
// Keep a lock on the messages
std::unique_lock<std::mutex> unique_lock(messages_mutex_);
// If this message is about to push us past our queue size, erase the oldest message
if (queue_size_ != 0 && messages_.size() + 1 > queue_size_) {
++dropped_message_count_;
const MessageInfo & front = messages_.front();
TF2_ROS_MESSAGEFILTER_DEBUG(
"Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)",
messages_.size(),
(mt::FrameId<M>::value(*front.event.getMessage())).c_str(),
mt::TimeStamp<M>::value(*front.event.getMessage()).seconds());
messageDropped(front.event, filter_failure_reasons::QueueFull);
messages_.pop_front();
}
// Add the message to our list
info.event = evt;
messages_.push_back(info);
}
TF2_ROS_MESSAGEFILTER_DEBUG(
"Added message in frame %s at time %.3f, count now %d",
frame_id.c_str(), stamp.seconds(), messages_.size());
++incoming_message_count_;
for (const auto & param : wait_params) {
const auto & handle = std::get<0>(param);
const auto & stamp = std::get<1>(param);
const auto & target_frame = std::get<2>(param);
tf2_ros::TransformStampedFuture future = buffer_.waitForTransform(
target_frame,
frame_id,
stamp,
buffer_timeout_,
std::bind(&MessageFilter::transformReadyCallback, this, std::placeholders::_1, handle));
// If handle of future is 0 or 0xffffffffffffffffULL, waitForTransform have already called
// the callback.
if (0 != future.getHandle() && 0xffffffffffffffffULL != future.getHandle()) {
std::unique_lock<std::mutex> lock(ts_futures_mutex_);
ts_futures_.insert({handle, std::move(future)});
}
}
}
void add(const MConstPtr & message)
{
auto t = node_clock_->get_clock()->now();
add(MEvent(message, t));
}
#if 0
message_filters::Connection registerFailureCallback(const FailureCallback & callback)
{
message_connection_failure.disconnect();
message_connection_failure = this->registerCallback(callback, this);
}
#endif
virtual void setQueueSize(uint32_t new_queue_size)
{
queue_size_ = new_queue_size;
}
virtual uint32_t getQueueSize()
{
return queue_size_;
}
private:
void init()
{
successful_transform_count_ = 0;
failed_out_the_back_count_ = 0;
transform_message_count_ = 0;
incoming_message_count_ = 0;
dropped_message_count_ = 0;
time_tolerance_ = rclcpp::Duration(0, 0);
warned_about_empty_frame_id_ = false;
expected_success_count_ = 1;
}
void transformReadyCallback(const tf2_ros::TransformStampedFuture & future, const uint64_t handle)
{
namespace mt = message_filters::message_traits;
MEvent saved_event;
bool event_found = false;
{
std::unique_lock<std::mutex> lock(ts_futures_mutex_);
auto iter = ts_futures_.find(handle);
if (iter != ts_futures_.end()) {
ts_futures_.erase(iter);
}
}
{
// We will be accessing and mutating messages now, require unique lock
std::unique_lock<std::mutex> lock(messages_mutex_);
// find the message this request is associated with
typename L_MessageInfo::iterator msg_it = messages_.begin();
typename L_MessageInfo::iterator msg_end = messages_.end();
for (; msg_it != msg_end; ++msg_it) {
MessageInfo & info = *msg_it;
auto handle_it = std::find(info.handles.begin(), info.handles.end(), handle);
if (handle_it != info.handles.end()) {
// found msg_it
++info.success_count;
if (info.success_count >= expected_success_count_) {
saved_event = msg_it->event;
messages_.erase(msg_it);
event_found = true;
}
break;
}
}
}
if (!event_found) {
return;
}
bool can_transform = true;
const MConstPtr & message = saved_event.getMessage();
std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
rclcpp::Time stamp = mt::TimeStamp<M>::value(*message);
bool transform_available = true;
FilterFailureReason error = filter_failure_reasons::Unknown;
try {
future.get();
} catch (...) {
transform_available = false;
error = filter_failure_reasons::OutTheBack;
}
if (transform_available) {
std::unique_lock<std::mutex> frames_lock(target_frames_mutex_);
// make sure we can still perform all the necessary transforms
typename V_string::iterator it = target_frames_.begin();
typename V_string::iterator end = target_frames_.end();
for (; it != end; ++it) {
const std::string & target = *it;
if (!buffer_.canTransform(target, frame_id, tf2_ros::fromRclcpp(stamp), NULL)) {
can_transform = false;
break;
}
if (time_tolerance_.nanoseconds()) {
if (!buffer_.canTransform(
target, frame_id,
tf2_ros::fromRclcpp(stamp + time_tolerance_), NULL))
{
can_transform = false;
break;
}
}
}
} else {
can_transform = false;
}
if (can_transform) {
TF2_ROS_MESSAGEFILTER_DEBUG(
"Message ready in frame %s at time %.3f, count now %d",
frame_id.c_str(), stamp.seconds(), messages_.size());
++successful_transform_count_;
messageReady(saved_event);
} else {
++dropped_message_count_;
TF2_ROS_MESSAGEFILTER_DEBUG(
"Discarding message in frame %s at time %.3f, count now %d",
frame_id.c_str(), stamp.seconds(), messages_.size());
messageDropped(saved_event, error);
}
}
void incomingMessage(const message_filters::MessageEvent<M const> & evt)
{
add(evt);
}
void checkFailures()
{
if (!next_failure_warning_.nanoseconds()) {
next_failure_warning_ = node_clock_->get_clock()->now() + rclcpp::Duration(15, 0);
}
if (node_clock_->get_clock()->now() >= next_failure_warning_) {
if (incoming_message_count_ - messages_.size() == 0) {
return;
}
double dropped_pct = static_cast<double>(dropped_message_count_) /
static_cast<double>(incoming_message_count_ - messages_.size());
if (dropped_pct > 0.95) {
TF2_ROS_MESSAGEFILTER_WARN(
"Dropped %.2f%% of messages so far. Please turn the "
"[tf2_ros_message_filter.message_notifier] rosconsole logger to DEBUG for more "
"information.",
dropped_pct * 100);
next_failure_warning_ = node_clock_->get_clock()->now() + rclcpp::Duration(60, 0);
if (static_cast<double>(failed_out_the_back_count_) /
static_cast<double>(dropped_message_count_) > 0.5)
{
TF2_ROS_MESSAGEFILTER_WARN(
" The majority of dropped messages were due to messages growing older than the TF "
"cache time. The last message's timestamp was: %f, and the last frame_id was: %s",
last_out_the_back_stamp_.seconds(), last_out_the_back_frame_.c_str());
}
}
}
}
// TODO(clalancette): reenable this once we have underlying support for callback queues
#if 0
struct CBQueueCallback : public ros::CallbackInterface
{
CBQueueCallback(
MessageFilter * filter, const MEvent & event, bool success, FilterFailureReason reason)
: event_(event),
filter_(filter),
reason_(reason),
success_(success)
{}
virtual CallResult call()
{
if (success_) {
filter_->signalMessage(event_);
} else {
filter_->signalFailure(event_, reason_);
}
return Success;
}
private:
MEvent event_;
MessageFilter * filter_;
FilterFailureReason reason_;
bool success_;
};
#endif
void messageDropped(const MEvent & evt, FilterFailureReason reason)
{
// TODO(clalancette): reenable this once we have underlying support for callback queues
#if 0
if (callback_queue_) {
ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, false, reason));
callback_queue_->addCallback(cb, (uint64_t)this);
} else {}
#endif
{
signalFailure(evt, reason);
}
}
void messageReady(const MEvent & evt)
{
// TODO(clalancette): reenable this once we have underlying support for callback queues
#if 0
if (callback_queue_) {
ros::CallbackInterfacePtr cb(new CBQueueCallback(
this, evt, true, filter_failure_reasons::Unknown));
callback_queue_->addCallback(cb, (uint64_t)this);
} else {}
#endif
{
this->signalMessage(evt);
}
}
void signalFailure(const MEvent & evt, FilterFailureReason reason)
{
namespace mt = message_filters::message_traits;
const MConstPtr & message = evt.getMessage();
std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
rclcpp::Time stamp = mt::TimeStamp<M>::value(*message);
RCLCPP_INFO(
node_logging_->get_logger(),
"Message Filter dropping message: frame '%s' at time %.3f for reason '%s'",
frame_id.c_str(), stamp.seconds(), get_filter_failure_reason_string(reason).c_str());
}
static std::string stripSlash(const std::string & in)
{
if (!in.empty() && (in[0] == '/')) {
std::string out = in;
out.erase(0, 1);
return out;
}
return in;
}
const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging_;
const rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock_;
BufferT & buffer_;
V_string target_frames_;
std::string target_frames_string_;
std::mutex target_frames_mutex_;
uint32_t queue_size_;
uint64_t next_handle_index_ = 0;
struct MessageInfo
{
MessageInfo()
: success_count(0) {}
MEvent event;
std::vector<uint64_t> handles;
uint64_t success_count;
};
typedef std::list<MessageInfo> L_MessageInfo;
L_MessageInfo messages_;
std::mutex messages_mutex_;
uint64_t expected_success_count_;
bool warned_about_empty_frame_id_;
uint64_t successful_transform_count_;
uint64_t failed_out_the_back_count_;
uint64_t transform_message_count_;
uint64_t incoming_message_count_;
uint64_t dropped_message_count_;
rclcpp::Time last_out_the_back_stamp_;
std::string last_out_the_back_frame_;
rclcpp::Time next_failure_warning_;
// but can have associated duration
rclcpp::Duration time_tolerance_ = rclcpp::Duration(0, 0);
message_filters::Connection message_connection_;
message_filters::Connection message_connection_failure;
// Timeout duration when calling the buffer method 'waitForTransform'
tf2::Duration buffer_timeout_;
std::mutex ts_futures_mutex_;
// to clear the callback in the Buffer if MessageFiltered object is destroyed.
std::unordered_map<uint64_t, tf2_ros::TransformStampedFuture> ts_futures_;
};
} // namespace tf2_ros
#endif // TF2_ROS__MESSAGE_FILTER_H_