00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00032 #ifndef TF_MESSAGE_FILTER_H
00033 #define TF_MESSAGE_FILTER_H
00034
00035 #include <ros/ros.h>
00036 #include <tf/transform_listener.h>
00037 #include <tf/tfMessage.h>
00038
00039 #include <string>
00040 #include <list>
00041 #include <vector>
00042 #include <boost/function.hpp>
00043 #include <boost/bind.hpp>
00044 #include <boost/shared_ptr.hpp>
00045 #include <boost/weak_ptr.hpp>
00046 #include <boost/thread.hpp>
00047 #include <boost/signals2.hpp>
00048
00049 #include <ros/callback_queue.h>
00050
00051 #include <message_filters/connection.h>
00052 #include <message_filters/simple_filter.h>
00053
00054 #define TF_MESSAGEFILTER_DEBUG(fmt, ...) \
00055 ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00056
00057 #define TF_MESSAGEFILTER_WARN(fmt, ...) \
00058 ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00059
00060 namespace tf
00061 {
00062
00063 namespace filter_failure_reasons
00064 {
00065 enum FilterFailureReason
00066 {
00068 Unknown,
00070 OutTheBack,
00072 EmptyFrameID,
00073 };
00074 }
00075 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
00076
00077 class MessageFilterBase
00078 {
00079 public:
00080 virtual ~MessageFilterBase(){}
00081 virtual void clear() = 0;
00082 virtual void setTargetFrame(const std::string& target_frame) = 0;
00083 virtual void setTargetFrames(const std::vector<std::string>& target_frames) = 0;
00084 virtual void setTolerance(const ros::Duration& tolerance) = 0;
00085 virtual void setQueueSize( uint32_t new_queue_size ) = 0;
00086 virtual uint32_t getQueueSize() = 0;
00087 };
00088
00105 template<class M>
00106 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
00107 {
00108 public:
00109 typedef boost::shared_ptr<M const> MConstPtr;
00110 typedef ros::MessageEvent<M const> MEvent;
00111 typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
00112 typedef boost::signals2::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
00113
00123 MessageFilter(Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00124 : tf_(tf)
00125 , nh_(nh)
00126 , max_rate_(max_rate)
00127 , queue_size_(queue_size)
00128 {
00129 init();
00130
00131 setTargetFrame(target_frame);
00132 }
00133
00144 template<class F>
00145 MessageFilter(F& f, Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00146 : tf_(tf)
00147 , nh_(nh)
00148 , max_rate_(max_rate)
00149 , queue_size_(queue_size)
00150 {
00151 init();
00152
00153 setTargetFrame(target_frame);
00154
00155 connectInput(f);
00156 }
00157
00161 template<class F>
00162 void connectInput(F& f)
00163 {
00164 message_connection_.disconnect();
00165 message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
00166 }
00167
00171 ~MessageFilter()
00172 {
00173
00174 max_rate_timer_.stop();
00175 message_connection_.disconnect();
00176 tf_.removeTransformsChangedListener(tf_connection_);
00177
00178 clear();
00179
00180 TF_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00181 (long long unsigned int)successful_transform_count_, (long long unsigned int)failed_transform_count_,
00182 (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_,
00183 (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00184
00185 }
00186
00190 void setTargetFrame(const std::string& target_frame)
00191 {
00192 std::vector<std::string> frames;
00193 frames.push_back(target_frame);
00194 setTargetFrames(frames);
00195 }
00196
00200 void setTargetFrames(const std::vector<std::string>& target_frames)
00201 {
00202 boost::mutex::scoped_lock list_lock(messages_mutex_);
00203 boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
00204
00205 target_frames_ = target_frames;
00206
00207 std::stringstream ss;
00208 for (std::vector<std::string>::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00209 {
00210 ss << *it << " ";
00211 }
00212 target_frames_string_ = ss.str();
00213 }
00217 std::string getTargetFramesString()
00218 {
00219 boost::mutex::scoped_lock lock(target_frames_string_mutex_);
00220 return target_frames_string_;
00221 };
00222
00226 void setTolerance(const ros::Duration& tolerance)
00227 {
00228 time_tolerance_ = tolerance;
00229 }
00230
00234 void clear()
00235 {
00236 boost::mutex::scoped_lock lock(messages_mutex_);
00237
00238 TF_MESSAGEFILTER_DEBUG("%s", "Cleared");
00239
00240 messages_.clear();
00241 message_count_ = 0;
00242
00243 warned_about_unresolved_name_ = false;
00244 warned_about_empty_frame_id_ = false;
00245 }
00246
00247 void add(const MEvent& evt)
00248 {
00249 boost::mutex::scoped_lock lock(messages_mutex_);
00250
00251 testMessages();
00252
00253 if (!testMessage(evt))
00254 {
00255
00256 if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00257 {
00258 ++dropped_message_count_;
00259 const MEvent& front = messages_.front();
00260 TF_MESSAGEFILTER_DEBUG(
00261 "Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)",
00262 message_count_,
00263 ros::message_traits::FrameId<M>::value(*front.getMessage()).c_str(),
00264 ros::message_traits::TimeStamp<M>::value(*front.getMessage()).toSec());
00265 signalFailure(messages_.front(), filter_failure_reasons::Unknown);
00266
00267 messages_.pop_front();
00268 --message_count_;
00269 }
00270
00271
00272 messages_.push_back(evt);
00273 ++message_count_;
00274 }
00275
00276 TF_MESSAGEFILTER_DEBUG(
00277 "Added message in frame %s at time %.3f, count now %d",
00278 ros::message_traits::FrameId<M>::value(*evt.getMessage()).c_str(),
00279 ros::message_traits::TimeStamp<M>::value(*evt.getMessage()).toSec(),
00280 message_count_);
00281
00282 ++incoming_message_count_;
00283 }
00284
00290 void add(const MConstPtr& message)
00291 {
00292
00293 boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
00294 (*header)["callerid"] = "unknown";
00295 add(MEvent(message, header, ros::Time::now()));
00296 }
00297
00302 message_filters::Connection registerFailureCallback(const FailureCallback& callback)
00303 {
00304 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00305 return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
00306 }
00307
00308 virtual void setQueueSize( uint32_t new_queue_size )
00309 {
00310 queue_size_ = new_queue_size;
00311 }
00312
00313 virtual uint32_t getQueueSize()
00314 {
00315 return queue_size_;
00316 }
00317
00318 private:
00319
00320 void init()
00321 {
00322 message_count_ = 0;
00323 new_transforms_ = false;
00324 successful_transform_count_ = 0;
00325 failed_transform_count_ = 0;
00326 failed_out_the_back_count_ = 0;
00327 transform_message_count_ = 0;
00328 incoming_message_count_ = 0;
00329 dropped_message_count_ = 0;
00330 time_tolerance_ = ros::Duration(0.0);
00331 warned_about_unresolved_name_ = false;
00332 warned_about_empty_frame_id_ = false;
00333
00334 tf_connection_ = tf_.addTransformsChangedListener(boost::bind(&MessageFilter::transformsChanged, this));
00335
00336 max_rate_timer_ = nh_.createTimer(max_rate_, &MessageFilter::maxRateTimerCallback, this);
00337 }
00338
00339 typedef std::list<MEvent> L_Event;
00340
00341 bool testMessage(const MEvent& evt)
00342 {
00343 const MConstPtr& message = evt.getMessage();
00344 std::string callerid = evt.getPublisherName();
00345 std::string frame_id = ros::message_traits::FrameId<M>::value(*message);
00346 ros::Time stamp = ros::message_traits::TimeStamp<M>::value(*message);
00347
00348
00349 if (frame_id.empty())
00350 {
00351 if (!warned_about_empty_frame_id_)
00352 {
00353 warned_about_empty_frame_id_ = true;
00354 TF_MESSAGEFILTER_WARN("Discarding message from [%s] due to empty frame_id. This message will only print once.", callerid.c_str());
00355 }
00356 signalFailure(evt, filter_failure_reasons::EmptyFrameID);
00357 return true;
00358 }
00359
00360
00361
00363 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it)
00364 {
00365 const std::string& target_frame = *target_it;
00366
00367 if (target_frame != frame_id && stamp != ros::Time(0))
00368 {
00369 ros::Time latest_transform_time ;
00370
00371 tf_.getLatestCommonTime(frame_id, target_frame, latest_transform_time, 0) ;
00372
00373 if (stamp + tf_.getCacheLength() < latest_transform_time)
00374 {
00375 ++failed_out_the_back_count_;
00376 ++dropped_message_count_;
00377 TF_MESSAGEFILTER_DEBUG(
00378 "Discarding Message, in frame %s, Out of the back of Cache Time "
00379 "(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f. "
00380 "Message Count now: %d",
00381 ros::message_traits::FrameId<M>::value(*message).c_str(),
00382 ros::message_traits::TimeStamp<M>::value(*message).toSec(),
00383 tf_.getCacheLength().toSec(), latest_transform_time.toSec(), message_count_);
00384
00385 last_out_the_back_stamp_ = stamp;
00386 last_out_the_back_frame_ = frame_id;
00387
00388 signalFailure(evt, filter_failure_reasons::OutTheBack);
00389 return true;
00390 }
00391 }
00392
00393 }
00394
00395 bool ready = !target_frames_.empty();
00396 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it)
00397 {
00398 std::string& target_frame = *target_it;
00399 if (time_tolerance_ != ros::Duration(0.0))
00400 {
00401 ready = ready && (tf_.canTransform(target_frame, frame_id, stamp) &&
00402 tf_.canTransform(target_frame, frame_id, stamp + time_tolerance_) );
00403 }
00404 else
00405 {
00406 ready = ready && tf_.canTransform(target_frame, frame_id, stamp);
00407 }
00408 }
00409
00410 if (ready)
00411 {
00412 TF_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
00413
00414 ++successful_transform_count_;
00415
00416 this->signalMessage(evt);
00417 }
00418 else
00419 {
00420 ++failed_transform_count_;
00421 }
00422
00423 return ready;
00424 }
00425
00426 void testMessages()
00427 {
00428 if (!messages_.empty() && getTargetFramesString() == " ")
00429 {
00430 ROS_WARN_NAMED("message_notifier", "MessageFilter [target=%s]: empty target frame", getTargetFramesString().c_str());
00431 }
00432
00433 int i = 0;
00434
00435 typename L_Event::iterator it = messages_.begin();
00436 for (; it != messages_.end(); ++i)
00437 {
00438 MEvent& evt = *it;
00439
00440 if (testMessage(evt))
00441 {
00442 --message_count_;
00443 it = messages_.erase(it);
00444 }
00445 else
00446 {
00447 ++it;
00448 }
00449 }
00450 }
00451
00452 void maxRateTimerCallback(const ros::TimerEvent&)
00453 {
00454 boost::mutex::scoped_lock list_lock(messages_mutex_);
00455 if (new_transforms_)
00456 {
00457 testMessages();
00458 new_transforms_ = false;
00459 }
00460
00461 checkFailures();
00462 }
00463
00467 void incomingMessage(const ros::MessageEvent<M const>& evt)
00468 {
00469 add(evt);
00470 }
00471
00472 void transformsChanged()
00473 {
00474 new_transforms_ = true;
00475
00476 ++transform_message_count_;
00477 }
00478
00479 void checkFailures()
00480 {
00481 if (next_failure_warning_.isZero())
00482 {
00483 next_failure_warning_ = ros::Time::now() + ros::Duration(15);
00484 }
00485
00486 if (ros::Time::now() >= next_failure_warning_)
00487 {
00488 if (incoming_message_count_ - message_count_ == 0)
00489 {
00490 return;
00491 }
00492
00493 double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00494 if (dropped_pct > 0.95)
00495 {
00496 TF_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00497 next_failure_warning_ = ros::Time::now() + ros::Duration(60);
00498
00499 if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00500 {
00501 TF_MESSAGEFILTER_WARN(" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00502 }
00503 }
00504 }
00505 }
00506
00507 void disconnectFailure(const message_filters::Connection& c)
00508 {
00509 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00510 c.getBoostConnection().disconnect();
00511 }
00512
00513 void signalFailure(const MEvent& evt, FilterFailureReason reason)
00514 {
00515 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00516 failure_signal_(evt.getMessage(), reason);
00517 }
00518
00519 Transformer& tf_;
00520 ros::NodeHandle nh_;
00521 ros::Duration max_rate_;
00522 ros::Timer max_rate_timer_;
00523 std::vector<std::string> target_frames_;
00524 std::string target_frames_string_;
00525 boost::mutex target_frames_string_mutex_;
00526 uint32_t queue_size_;
00527
00528 L_Event messages_;
00529 uint32_t message_count_;
00530 boost::mutex messages_mutex_;
00531
00532 bool new_messages_;
00533 volatile bool new_transforms_;
00534
00535 bool warned_about_unresolved_name_;
00536 bool warned_about_empty_frame_id_;
00537
00538 uint64_t successful_transform_count_;
00539 uint64_t failed_transform_count_;
00540 uint64_t failed_out_the_back_count_;
00541 uint64_t transform_message_count_;
00542 uint64_t incoming_message_count_;
00543 uint64_t dropped_message_count_;
00544
00545 ros::Time last_out_the_back_stamp_;
00546 std::string last_out_the_back_frame_;
00547
00548 ros::Time next_failure_warning_;
00549
00550 ros::Duration time_tolerance_;
00551
00552 boost::signals2::connection tf_connection_;
00553 message_filters::Connection message_connection_;
00554
00555 FailureSignal failure_signal_;
00556 boost::mutex failure_signal_mutex_;
00557 };
00558
00559
00560 }
00561
00562 #endif
00563