32 #ifndef TF_MESSAGE_FILTER_H 33 #define TF_MESSAGE_FILTER_H 37 #include <tf/tfMessage.h> 42 #include <boost/function.hpp> 43 #include <boost/bind.hpp> 44 #include <boost/shared_ptr.hpp> 45 #include <boost/weak_ptr.hpp> 46 #include <boost/thread.hpp> 47 #include <boost/signals2.hpp> 54 #define TF_MESSAGEFILTER_DEBUG(fmt, ...) \ 55 ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__) 57 #define TF_MESSAGEFILTER_WARN(fmt, ...) \ 58 ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__) 63 namespace filter_failure_reasons
81 virtual void clear() = 0;
82 virtual void setTargetFrame(
const std::string& target_frame) = 0;
83 virtual void setTargetFrames(
const std::vector<std::string>& target_frames) = 0;
84 virtual void setTolerance(
const ros::Duration& tolerance) = 0;
85 virtual void setQueueSize( uint32_t new_queue_size ) = 0;
86 virtual uint32_t getQueueSize() = 0;
112 typedef boost::signals2::signal<void(const MConstPtr&, FilterFailureReason)>
FailureSignal;
126 , max_rate_(max_rate)
127 , queue_size_(queue_size)
131 setTargetFrame(target_frame);
148 , max_rate_(max_rate)
149 , queue_size_(queue_size)
153 setTargetFrame(target_frame);
164 message_connection_.disconnect();
174 max_rate_timer_.stop();
175 message_connection_.disconnect();
176 tf_.removeTransformsChangedListener(tf_connection_);
180 TF_MESSAGEFILTER_DEBUG(
"Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
181 (
long long unsigned int)successful_transform_count_, (
long long unsigned int)failed_transform_count_,
182 (
long long unsigned int)failed_out_the_back_count_, (
long long unsigned int)transform_message_count_,
183 (
long long unsigned int)incoming_message_count_, (
long long unsigned int)dropped_message_count_);
192 std::vector<std::string> frames;
193 frames.push_back(target_frame);
194 setTargetFrames(frames);
202 boost::mutex::scoped_lock list_lock(messages_mutex_);
203 boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
205 target_frames_ = target_frames;
207 std::stringstream ss;
208 for (std::vector<std::string>::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
212 target_frames_string_ = ss.str();
219 boost::mutex::scoped_lock lock(target_frames_string_mutex_);
220 return target_frames_string_;
228 time_tolerance_ = tolerance;
236 boost::mutex::scoped_lock lock(messages_mutex_);
243 warned_about_unresolved_name_ =
false;
244 warned_about_empty_frame_id_ =
false;
247 void add(
const MEvent& evt)
249 boost::mutex::scoped_lock lock(messages_mutex_);
253 if (!testMessage(evt))
256 if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
258 ++dropped_message_count_;
259 const MEvent& front = messages_.front();
261 "Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)",
267 messages_.pop_front();
272 messages_.push_back(evt);
277 "Added message in frame %s at time %.3f, count now %d",
282 ++incoming_message_count_;
290 void add(
const MConstPtr& message)
294 (*header)[
"callerid"] =
"unknown";
304 boost::mutex::scoped_lock lock(failure_signal_mutex_);
310 queue_size_ = new_queue_size;
323 new_transforms_ =
false;
324 successful_transform_count_ = 0;
325 failed_transform_count_ = 0;
326 failed_out_the_back_count_ = 0;
327 transform_message_count_ = 0;
328 incoming_message_count_ = 0;
329 dropped_message_count_ = 0;
331 warned_about_unresolved_name_ =
false;
332 warned_about_empty_frame_id_ =
false;
349 if (frame_id.empty())
351 if (!warned_about_empty_frame_id_)
353 warned_about_empty_frame_id_ =
true;
354 TF_MESSAGEFILTER_WARN(
"Discarding message from [%s] due to empty frame_id. This message will only print once.", callerid.c_str());
363 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it)
365 const std::string& target_frame = *target_it;
367 if (target_frame != frame_id && stamp !=
ros::Time(0))
371 tf_.getLatestCommonTime(frame_id, target_frame, latest_transform_time, 0) ;
373 if (stamp + tf_.getCacheLength() < latest_transform_time)
375 ++failed_out_the_back_count_;
376 ++dropped_message_count_;
378 "Discarding Message, in frame %s, Out of the back of Cache Time " 379 "(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f. " 380 "Message Count now: %d",
383 tf_.getCacheLength().toSec(), latest_transform_time.
toSec(), message_count_);
385 last_out_the_back_stamp_ = stamp;
386 last_out_the_back_frame_ = frame_id;
395 bool ready = !target_frames_.empty();
396 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it)
398 std::string& target_frame = *target_it;
401 ready = ready && (tf_.canTransform(target_frame, frame_id, stamp) &&
402 tf_.canTransform(target_frame, frame_id, stamp + time_tolerance_) );
406 ready = ready && tf_.canTransform(target_frame, frame_id, stamp);
414 ++successful_transform_count_;
416 this->signalMessage(evt);
420 ++failed_transform_count_;
428 if (!messages_.empty() && getTargetFramesString() ==
" ")
430 ROS_WARN_NAMED(
"message_notifier",
"MessageFilter [target=%s]: empty target frame", getTargetFramesString().c_str());
435 typename L_Event::iterator it = messages_.begin();
436 for (; it != messages_.end(); ++i)
440 if (testMessage(evt))
443 it = messages_.erase(it);
454 boost::mutex::scoped_lock list_lock(messages_mutex_);
458 new_transforms_ =
false;
474 new_transforms_ =
true;
476 ++transform_message_count_;
481 if (next_failure_warning_.isZero())
488 if (incoming_message_count_ - message_count_ == 0)
493 double dropped_pct = (double)dropped_message_count_ / (
double)(incoming_message_count_ - message_count_);
494 if (dropped_pct > 0.95)
499 if ((
double)failed_out_the_back_count_ / (
double)dropped_message_count_ > 0.5)
501 TF_MESSAGEFILTER_WARN(
" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
509 boost::mutex::scoped_lock lock(failure_signal_mutex_);
515 boost::mutex::scoped_lock lock(failure_signal_mutex_);
FailureSignal failure_signal_
virtual ~MessageFilterBase()
ros::Time next_failure_warning_
ros::NodeHandle nh_
The node used to subscribe to the topic.
boost::mutex target_frames_string_mutex_
Follows the patterns set by the message_filters package to implement a filter which only passes messa...
L_Event messages_
The message list.
void connectInput(F &f)
Connect this filter's input to another filter's output. If this filter is already connected...
void signalFailure(const MEvent &evt, FilterFailureReason reason)
ros::Duration time_tolerance_
Provide additional tolerance on time for messages which are stamped but can have associated duration...
uint64_t dropped_message_count_
#define ROS_WARN_NAMED(name,...)
ros::Timer max_rate_timer_
MessageFilter(Transformer &tf, const std::string &target_frame, uint32_t queue_size, ros::NodeHandle nh=ros::NodeHandle(), ros::Duration max_rate=ros::Duration(0.01))
Constructor.
uint64_t transform_message_count_
void add(const MEvent &evt)
virtual uint32_t getQueueSize()
boost::signals2::connection tf_connection_
The message buffer overflowed, and this message was pushed off the back of the queue, but the reason it was unable to be transformed is unknown.
ros::MessageEvent< M const > MEvent
std::vector< std::string > target_frames_
The frames we need to be able to transform to before a message is ready.
bool add(const actionlib::TwoIntsGoal &req, actionlib::TwoIntsResult &res)
void incomingMessage(const ros::MessageEvent< M const > &evt)
Callback that happens when we receive a message on the message topic.
~MessageFilter()
Destructor.
std::list< MEvent > L_Event
boost::signals2::connection getBoostConnection() const
void clear()
Clear any messages currently in the queue.
virtual void setQueueSize(uint32_t new_queue_size)
const std::string & getPublisherName() const
volatile bool new_transforms_
Used to skip waiting on new_data_ if new transforms have come in while calling back or transforming d...
void setTargetFrames(const std::vector< std::string > &target_frames)
Set the frames you need to be able to transform to before getting a message callback.
void maxRateTimerCallback(const ros::TimerEvent &)
boost::mutex failure_signal_mutex_
Transformer & tf_
The Transformer used to determine if transformation data is available.
bool new_messages_
Used to skip waiting on new_data_ if new messages have come in while calling back.
std::string last_out_the_back_frame_
message_filters::Connection registerFailureCallback(const FailureCallback &callback)
Register a callback to be called when a message is about to be dropped.
uint32_t message_count_
The number of messages in the list. Used because messages_.size() has linear cost.
MessageFilter(F &f, Transformer &tf, const std::string &target_frame, uint32_t queue_size, ros::NodeHandle nh=ros::NodeHandle(), ros::Duration max_rate=ros::Duration(0.01))
Constructor.
#define TF_MESSAGEFILTER_DEBUG(fmt,...)
uint64_t successful_transform_count_
void disconnectFailure(const message_filters::Connection &c)
bool testMessage(const MEvent &evt)
uint64_t failed_out_the_back_count_
ros::Time last_out_the_back_stamp_
boost::signals2::signal< void(const MConstPtr &, FilterFailureReason)> FailureSignal
boost::function< void(const MConstPtr &, FilterFailureReason)> FailureCallback
message_filters::Connection message_connection_
#define TF_MESSAGEFILTER_WARN(fmt,...)
uint64_t failed_transform_count_
std::string getTargetFramesString()
Get the target frames as a string for debugging.
void setTargetFrame(const std::string &target_frame)
Set the frame you need to be able to transform to before getting a message callback.
uint32_t queue_size_
The maximum number of messages we queue up.
std::string target_frames_string_
The frame_id on the message is empty.
The timestamp on the message is more than the cache length earlier than the newest data in the transf...
uint64_t incoming_message_count_
void setTolerance(const ros::Duration &tolerance)
Set the required tolerance for the notifier to return true.
bool warned_about_unresolved_name_
#define ROSCONSOLE_DEFAULT_NAME
bool warned_about_empty_frame_id_
void add(const MConstPtr &message)
Manually add a message into this filter.
boost::mutex messages_mutex_
The mutex used for locking message list operations.
boost::shared_ptr< M > getMessage() const
boost::shared_ptr< M const > MConstPtr