32 #ifndef TF2_ROS_MESSAGE_FILTER_H 33 #define TF2_ROS_MESSAGE_FILTER_H 40 #include <boost/function.hpp> 41 #include <boost/bind.hpp> 42 #include <boost/shared_ptr.hpp> 43 #include <boost/thread.hpp> 52 #define TF2_ROS_MESSAGEFILTER_DEBUG(fmt, ...) \ 53 ROS_DEBUG_NAMED("message_filter", std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), getTargetFramesString().c_str(), __VA_ARGS__) 55 #define TF2_ROS_MESSAGEFILTER_WARN(fmt, ...) \ 56 ROS_WARN_NAMED("message_filter", std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), getTargetFramesString().c_str(), __VA_ARGS__) 61 namespace filter_failure_reasons
81 virtual void clear() = 0;
82 virtual void setTargetFrame(
const std::string& target_frame) = 0;
83 virtual void setTargetFrames(
const V_string& target_frames) = 0;
84 virtual void setTolerance(
const ros::Duration& tolerance) = 0;
110 typedef boost::signals2::signal<void(const MConstPtr&, FilterFailureReason)>
FailureSignal;
128 , queue_size_(queue_size)
129 , callback_queue_(nh.getCallbackQueue())
133 setTargetFrame(target_frame);
148 , queue_size_(queue_size)
149 , callback_queue_(nh.getCallbackQueue())
153 setTargetFrame(target_frame);
170 , queue_size_(queue_size)
171 , callback_queue_(cbqueue)
175 setTargetFrame(target_frame);
192 , queue_size_(queue_size)
193 , callback_queue_(cbqueue)
197 setTargetFrame(target_frame);
208 message_connection_.disconnect();
217 message_connection_.disconnect();
221 TF2_ROS_MESSAGEFILTER_DEBUG(
"Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
222 (
long long unsigned int)successful_transform_count_,
223 (
long long unsigned int)failed_out_the_back_count_, (
long long unsigned int)transform_message_count_,
224 (
long long unsigned int)incoming_message_count_, (
long long unsigned int)dropped_message_count_);
234 frames.push_back(target_frame);
235 setTargetFrames(frames);
243 boost::mutex::scoped_lock frames_lock(target_frames_mutex_);
245 target_frames_.resize(target_frames.size());
246 std::transform(target_frames.begin(), target_frames.end(), target_frames_.begin(), this->
stripSlash);
247 expected_success_count_ = target_frames_.size() * (time_tolerance_.isZero() ? 1 : 2);
249 std::stringstream ss;
250 for (V_string::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
254 target_frames_string_ = ss.str();
261 boost::mutex::scoped_lock lock(target_frames_mutex_);
262 return target_frames_string_;
270 boost::mutex::scoped_lock lock(target_frames_mutex_);
271 time_tolerance_ = tolerance;
272 expected_success_count_ = target_frames_.size() * (time_tolerance_.isZero() ? 1 : 2);
280 boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_);
284 bc_.removeTransformableCallback(callback_handle_);
290 warned_about_empty_frame_id_ =
false;
293 void add(
const MEvent& evt)
295 if (target_frames_.empty())
302 std::string frame_id =
stripSlash(mt::FrameId<M>::value(*message));
303 ros::Time stamp = mt::TimeStamp<M>::value(*message);
305 if (frame_id.empty())
313 info.
handles.reserve(expected_success_count_);
318 boost::mutex::scoped_lock frames_lock(target_frames_mutex_);
319 target_frames_copy = target_frames_;
322 V_string::iterator it = target_frames_copy.begin();
323 V_string::iterator end = target_frames_copy.end();
324 for (; it != end; ++it)
326 const std::string& target_frame = *it;
328 if (handle == 0xffffffffffffffffULL)
333 else if (handle == 0)
339 info.
handles.push_back(handle);
342 if (!time_tolerance_.isZero())
344 handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp + time_tolerance_);
345 if (handle == 0xffffffffffffffffULL)
350 else if (handle == 0)
356 info.
handles.push_back(handle);
370 boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_);
372 if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
374 ++dropped_message_count_;
379 V_TransformableRequestHandle::const_iterator it = front.
handles.begin();
380 V_TransformableRequestHandle::const_iterator end = front.
handles.end();
382 for (; it != end; ++it)
384 bc_.cancelTransformableRequest(*it);
388 messages_.pop_front();
394 messages_.push_back(info);
400 ++incoming_message_count_;
408 void add(
const MConstPtr& message)
411 (*header)[
"callerid"] =
"unknown";
414 add(MEvent(message, header, t));
423 boost::mutex::scoped_lock lock(failure_signal_mutex_);
429 queue_size_ = new_queue_size;
443 successful_transform_count_ = 0;
444 failed_out_the_back_count_ = 0;
445 transform_message_count_ = 0;
446 incoming_message_count_ = 0;
447 dropped_message_count_ = 0;
449 warned_about_empty_frame_id_ =
false;
450 expected_success_count_ = 1;
460 boost::upgrade_lock< boost::shared_mutex > lock(messages_mutex_);
463 typename L_MessageInfo::iterator msg_it = messages_.begin();
464 typename L_MessageInfo::iterator msg_end = messages_.end();
465 for (; msg_it != msg_end; ++msg_it)
468 V_TransformableRequestHandle::const_iterator handle_it = std::find(info.
handles.begin(), info.
handles.end(), request_handle);
469 if (handle_it != info.
handles.end())
477 if (msg_it == msg_end)
488 bool can_transform =
true;
490 std::string frame_id =
stripSlash(mt::FrameId<M>::value(*message));
491 ros::Time stamp = mt::TimeStamp<M>::value(*message);
495 boost::mutex::scoped_lock frames_lock(target_frames_mutex_);
497 typename V_string::iterator it = target_frames_.begin();
498 typename V_string::iterator end = target_frames_.end();
499 for (; it != end; ++it)
501 const std::string& target = *it;
502 if (!bc_.canTransform(target, frame_id, stamp))
504 can_transform =
false;
508 if (!time_tolerance_.isZero())
510 if (!bc_.canTransform(target, frame_id, stamp + time_tolerance_))
512 can_transform =
false;
520 can_transform =
false;
524 boost::upgrade_to_unique_lock< boost::shared_mutex > uniqueLock(lock);
529 ++successful_transform_count_;
531 messageReady(info.
event);
536 ++dropped_message_count_;
542 messages_.erase(msg_it);
556 if (next_failure_warning_.isZero())
563 if (incoming_message_count_ - message_count_ == 0)
568 double dropped_pct = (double)dropped_message_count_ / (
double)(incoming_message_count_ - message_count_);
569 if (dropped_pct > 0.95)
574 if ((
double)failed_out_the_back_count_ / (
double)dropped_message_count_ > 0.5)
576 TF2_ROS_MESSAGEFILTER_WARN(
" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
596 filter_->signalMessage(event_);
600 filter_->signalFailure(event_, reason_);
618 callback_queue_->addCallback(cb, (uint64_t)
this);
622 signalFailure(evt, reason);
631 callback_queue_->addCallback(cb, (uint64_t)
this);
635 this->signalMessage(evt);
641 boost::mutex::scoped_lock lock(failure_signal_mutex_);
647 boost::mutex::scoped_lock lock(failure_signal_mutex_);
654 if ( !in.empty() && (in[0] ==
'/'))
656 std::string out = in;
void clear()
Clear any messages currently in the queue.
std::list< MessageInfo > L_MessageInfo
FilterFailureReason reason_
uint32_t expected_success_count_
uint64_t dropped_message_count_
uint32_t queue_size_
The maximum number of messages we queue up.
void add(const MConstPtr &message)
Manually add a message into this filter.
Follows the patterns set by the message_filters package to implement a filter which only passes messa...
FailureSignal failure_signal_
~MessageFilter()
Destructor.
uint32_t TransformableCallbackHandle
The message buffer overflowed, and this message was pushed off the back of the queue, but the reason it was unable to be transformed is unknown.
boost::function< void(const MConstPtr &, FilterFailureReason)> FailureCallback
The frame_id on the message is empty.
void messageReady(const MEvent &evt)
void setTargetFrame(const std::string &target_frame)
Set the frame you need to be able to transform to before getting a message callback.
std::string getTargetFramesString()
Get the target frames as a string for debugging.
uint64_t transform_message_count_
tf2::BufferCore & bc_
The Transformer used to determine if transformation data is available.
void messageDropped(const MEvent &evt, FilterFailureReason reason)
ros::WallTime next_failure_warning_
std::vector< tf2::TransformableRequestHandle > V_TransformableRequestHandle
std::string last_out_the_back_frame_
boost::shared_mutex messages_mutex_
The mutex used for locking message list operations.
bool add(const actionlib::TwoIntsGoal &req, actionlib::TwoIntsResult &res)
message_filters::Connection registerFailureCallback(const FailureCallback &callback)
Register a callback to be called when a message is about to be dropped.
void signalFailure(const MEvent &evt, FilterFailureReason reason)
virtual CallResult call()
V_TransformableRequestHandle handles
MessageFilter(F &f, tf2::BufferCore &bc, const std::string &target_frame, uint32_t queue_size, ros::CallbackQueueInterface *cbqueue)
Constructor.
boost::signals2::connection getBoostConnection() const
#define TF2_ROS_MESSAGEFILTER_WARN(fmt,...)
void add(const MEvent &evt)
MessageFilter(tf2::BufferCore &bc, const std::string &target_frame, uint32_t queue_size, ros::CallbackQueueInterface *cbqueue)
Constructor.
boost::mutex failure_signal_mutex_
void setTargetFrames(const V_string &target_frames)
Set the frames you need to be able to transform to before getting a message callback.
void setTolerance(const ros::Duration &tolerance)
Set the required tolerance for the notifier to return true.
uint64_t successful_transform_count_
MessageFilter(F &f, tf2::BufferCore &bc, const std::string &target_frame, uint32_t queue_size, const ros::NodeHandle &nh)
Constructor.
tf2::TransformableCallbackHandle callback_handle_
virtual void setQueueSize(uint32_t new_queue_size)
CBQueueCallback(MessageFilter *filter, const MEvent &event, bool success, FilterFailureReason reason)
std::string stripSlash(const std::string &in)
message_filters::Connection message_connection_
void disconnectFailure(const message_filters::Connection &c)
ros::CallbackQueueInterface * callback_queue_
MessageFilter(tf2::BufferCore &bc, const std::string &target_frame, uint32_t queue_size, const ros::NodeHandle &nh)
Constructor.
The timestamp on the message is more than the cache length earlier than the newest data in the transf...
void incomingMessage(const ros::MessageEvent< M const > &evt)
Callback that happens when we receive a message on the message topic.
void transformable(tf2::TransformableRequestHandle request_handle, const std::string &target_frame, const std::string &source_frame, ros::Time time, tf2::TransformableResult result)
ros::Time last_out_the_back_stamp_
ros::Duration time_tolerance_
Provide additional tolerance on time for messages which are stamped but can have associated duration...
uint32_t message_count_
The number of messages in the list. Used because <container>.size() may have linear cost...
uint64_t incoming_message_count_
static std::string stripSlash(const std::string &in)
ros::MessageEvent< M const > MEvent
V_string target_frames_
The frames we need to be able to transform to before a message is ready.
std::string target_frames_string_
boost::shared_ptr< M const > MConstPtr
void connectInput(F &f)
Connect this filter's input to another filter's output. If this filter is already connected...
std::vector< std::string > V_string
boost::mutex target_frames_mutex_
A mutex to protect access to the target_frames_ list and target_frames_string.
boost::signals2::signal< void(const MConstPtr &, FilterFailureReason)> FailureSignal
bool warned_about_empty_frame_id_
#define TF2_ROS_MESSAGEFILTER_DEBUG(fmt,...)
uint64_t failed_out_the_back_count_
#define ROSCONSOLE_DEFAULT_NAME
virtual ~MessageFilterBase()
boost::shared_ptr< M > getMessage() const
virtual uint32_t getQueueSize()
uint64_t TransformableRequestHandle