00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00032 #ifndef TF_MESSAGE_FILTER_H
00033 #define TF_MESSAGE_FILTER_H
00034
00035 #include <ros/ros.h>
00036 #include <tf/transform_listener.h>
00037 #include <tf/tfMessage.h>
00038
00039 #include <string>
00040 #include <list>
00041 #include <vector>
00042 #include <boost/function.hpp>
00043 #include <boost/bind.hpp>
00044 #include <boost/shared_ptr.hpp>
00045 #include <boost/weak_ptr.hpp>
00046 #include <boost/thread.hpp>
00047 #include <boost/signals.hpp>
00048
00049 #include <ros/callback_queue.h>
00050
00051 #include <message_filters/connection.h>
00052 #include <message_filters/simple_filter.h>
00053
00054 #define TF_MESSAGEFILTER_DEBUG(fmt, ...) \
00055 ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00056
00057 #define TF_MESSAGEFILTER_WARN(fmt, ...) \
00058 ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00059
00060 namespace tf
00061 {
00062
00063 namespace filter_failure_reasons
00064 {
00065 enum FilterFailureReason
00066 {
00068 Unknown,
00070 OutTheBack,
00072 EmptyFrameID,
00073 };
00074 }
00075 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
00076
00077 class MessageFilterBase
00078 {
00079 public:
00080 virtual ~MessageFilterBase(){}
00081 virtual void clear() = 0;
00082 virtual void setTargetFrame(const std::string& target_frame) = 0;
00083 virtual void setTargetFrames(const std::vector<std::string>& target_frames) = 0;
00084 virtual void setTolerance(const ros::Duration& tolerance) = 0;
00085 virtual void setQueueSize( uint32_t new_queue_size ) = 0;
00086 virtual uint32_t getQueueSize() = 0;
00087 };
00088
00105 template<class M>
00106 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
00107 {
00108 public:
00109 typedef boost::shared_ptr<M const> MConstPtr;
00110 typedef ros::MessageEvent<M const> MEvent;
00111 typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
00112 typedef boost::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
00113
00114
00115 ROS_STATIC_ASSERT(ros::message_traits::HasHeader<M>::value);
00116
00126 MessageFilter(Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00127 : tf_(tf)
00128 , nh_(nh)
00129 , max_rate_(max_rate)
00130 , queue_size_(queue_size)
00131 {
00132 init();
00133
00134 setTargetFrame(target_frame);
00135 }
00136
00147 template<class F>
00148 MessageFilter(F& f, Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00149 : tf_(tf)
00150 , nh_(nh)
00151 , max_rate_(max_rate)
00152 , queue_size_(queue_size)
00153 {
00154 init();
00155
00156 setTargetFrame(target_frame);
00157
00158 connectInput(f);
00159 }
00160
00164 template<class F>
00165 void connectInput(F& f)
00166 {
00167 message_connection_.disconnect();
00168 message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
00169 }
00170
00174 ~MessageFilter()
00175 {
00176 message_connection_.disconnect();
00177 tf_.removeTransformsChangedListener(tf_connection_);
00178
00179 clear();
00180
00181 TF_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00182 (long long unsigned int)successful_transform_count_, (long long unsigned int)failed_transform_count_,
00183 (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_,
00184 (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00185
00186 }
00187
00191 void setTargetFrame(const std::string& target_frame)
00192 {
00193 std::vector<std::string> frames;
00194 frames.push_back(target_frame);
00195 setTargetFrames(frames);
00196 }
00197
00201 void setTargetFrames(const std::vector<std::string>& target_frames)
00202 {
00203 boost::mutex::scoped_lock list_lock(messages_mutex_);
00204 boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
00205
00206 target_frames_ = target_frames;
00207
00208 std::stringstream ss;
00209 for (std::vector<std::string>::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00210 {
00211 *it = tf::resolve(tf_.getTFPrefix(), *it);
00212 ss << *it << " ";
00213 }
00214 target_frames_string_ = ss.str();
00215 }
00219 std::string getTargetFramesString()
00220 {
00221 boost::mutex::scoped_lock lock(target_frames_string_mutex_);
00222 return target_frames_string_;
00223 };
00224
00228 void setTolerance(const ros::Duration& tolerance)
00229 {
00230 time_tolerance_ = tolerance;
00231 }
00232
00236 void clear()
00237 {
00238 boost::mutex::scoped_lock lock(messages_mutex_);
00239
00240 TF_MESSAGEFILTER_DEBUG("%s", "Cleared");
00241
00242 messages_.clear();
00243 message_count_ = 0;
00244
00245 warned_about_unresolved_name_ = false;
00246 warned_about_empty_frame_id_ = false;
00247 }
00248
00249 void add(const MEvent& evt)
00250 {
00251 boost::mutex::scoped_lock lock(messages_mutex_);
00252
00253 testMessages();
00254
00255 if (!testMessage(evt))
00256 {
00257
00258 if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00259 {
00260 ++dropped_message_count_;
00261 const MEvent& front = messages_.front();
00262 TF_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, front.getMessage()->header.frame_id.c_str(), front.getMessage()->header.stamp.toSec());
00263 signalFailure(messages_.front(), filter_failure_reasons::Unknown);
00264
00265 messages_.pop_front();
00266 --message_count_;
00267 }
00268
00269
00270 messages_.push_back(evt);
00271 ++message_count_;
00272 }
00273
00274 TF_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", evt.getMessage()->header.frame_id.c_str(), evt.getMessage()->header.stamp.toSec(), message_count_);
00275
00276 ++incoming_message_count_;
00277 }
00278
00284 void add(const MConstPtr& message)
00285 {
00286 boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
00287 (*header)["callerid"] = "unknown";
00288 add(MEvent(message, header, ros::Time::now()));
00289 }
00290
00295 message_filters::Connection registerFailureCallback(const FailureCallback& callback)
00296 {
00297 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00298 return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
00299 }
00300
00301 virtual void setQueueSize( uint32_t new_queue_size )
00302 {
00303 queue_size_ = new_queue_size;
00304 }
00305
00306 virtual uint32_t getQueueSize()
00307 {
00308 return queue_size_;
00309 }
00310
00311 private:
00312
00313 void init()
00314 {
00315 message_count_ = 0;
00316 new_transforms_ = false;
00317 successful_transform_count_ = 0;
00318 failed_transform_count_ = 0;
00319 failed_out_the_back_count_ = 0;
00320 transform_message_count_ = 0;
00321 incoming_message_count_ = 0;
00322 dropped_message_count_ = 0;
00323 time_tolerance_ = ros::Duration(0.0);
00324 warned_about_unresolved_name_ = false;
00325 warned_about_empty_frame_id_ = false;
00326
00327 tf_connection_ = tf_.addTransformsChangedListener(boost::bind(&MessageFilter::transformsChanged, this));
00328
00329 max_rate_timer_ = nh_.createTimer(max_rate_, &MessageFilter::maxRateTimerCallback, this);
00330 }
00331
00332 typedef std::list<MEvent> L_Event;
00333
00334 bool testMessage(const MEvent& evt)
00335 {
00336 const MConstPtr& message = evt.getMessage();
00337 std::string callerid = evt.getPublisherName();
00338 std::string frame_id = ros::message_traits::FrameId<M>::value(*message);
00339 ros::Time stamp = ros::message_traits::TimeStamp<M>::value(*message);
00340
00341
00342 if (frame_id.empty())
00343 {
00344 if (!warned_about_empty_frame_id_)
00345 {
00346 warned_about_empty_frame_id_ = true;
00347 TF_MESSAGEFILTER_WARN("Discarding message from [%s] due to empty frame_id. This message will only print once.", callerid.c_str());
00348 }
00349 signalFailure(evt, filter_failure_reasons::EmptyFrameID);
00350 return true;
00351 }
00352
00353 if (frame_id[0] != '/')
00354 {
00355 std::string unresolved = frame_id;
00356 frame_id = tf::resolve(tf_.getTFPrefix(), frame_id);
00357
00358 if (!warned_about_unresolved_name_)
00359 {
00360 warned_about_unresolved_name_ = true;
00361 ROS_WARN("Message from [%s] has a non-fully-qualified frame_id [%s]. Resolved locally to [%s]. This is will likely not work in multi-robot systems. This message will only print once.", callerid.c_str(), unresolved.c_str(), frame_id.c_str());
00362 }
00363 }
00364
00365
00367 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it)
00368 {
00369 const std::string& target_frame = *target_it;
00370
00371 if (target_frame != frame_id && stamp != ros::Time(0))
00372 {
00373 ros::Time latest_transform_time ;
00374
00375 tf_.getLatestCommonTime(frame_id, target_frame, latest_transform_time, 0) ;
00376 if (stamp + tf_.getCacheLength() < latest_transform_time)
00377 {
00378 ++failed_out_the_back_count_;
00379 ++dropped_message_count_;
00380 TF_MESSAGEFILTER_DEBUG("Discarding Message, in frame %s, Out of the back of Cache Time(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f. Message Count now: %d", message->header.frame_id.c_str(), message->header.stamp.toSec(), tf_.getCacheLength().toSec(), latest_transform_time.toSec(), message_count_);
00381
00382 last_out_the_back_stamp_ = stamp;
00383 last_out_the_back_frame_ = frame_id;
00384
00385 signalFailure(evt, filter_failure_reasons::OutTheBack);
00386 return true;
00387 }
00388 }
00389
00390 }
00391
00392 bool ready = !target_frames_.empty();
00393 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it)
00394 {
00395 std::string& target_frame = *target_it;
00396 if (time_tolerance_ != ros::Duration(0.0))
00397 {
00398 ready = ready && (tf_.canTransform(target_frame, frame_id, stamp) &&
00399 tf_.canTransform(target_frame, frame_id, stamp + time_tolerance_) );
00400 }
00401 else
00402 {
00403 ready = ready && tf_.canTransform(target_frame, frame_id, stamp);
00404 }
00405 }
00406
00407 if (ready)
00408 {
00409 TF_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
00410
00411 ++successful_transform_count_;
00412
00413 this->signalMessage(evt);
00414 }
00415 else
00416 {
00417 ++failed_transform_count_;
00418 }
00419
00420 return ready;
00421 }
00422
00423 void testMessages()
00424 {
00425 if (!messages_.empty() && getTargetFramesString() == " ")
00426 {
00427 ROS_WARN_NAMED("message_notifier", "MessageFilter [target=%s]: empty target frame", getTargetFramesString().c_str());
00428 }
00429
00430 int i = 0;
00431
00432 typename L_Event::iterator it = messages_.begin();
00433 for (; it != messages_.end(); ++i)
00434 {
00435 MEvent& evt = *it;
00436
00437 if (testMessage(evt))
00438 {
00439 --message_count_;
00440 it = messages_.erase(it);
00441 }
00442 else
00443 {
00444 ++it;
00445 }
00446 }
00447 }
00448
00449 void maxRateTimerCallback(const ros::TimerEvent&)
00450 {
00451 boost::mutex::scoped_lock list_lock(messages_mutex_);
00452 if (new_transforms_)
00453 {
00454 testMessages();
00455 new_transforms_ = false;
00456 }
00457
00458 checkFailures();
00459 }
00460
00464 void incomingMessage(const ros::MessageEvent<M const>& evt)
00465 {
00466 add(evt);
00467 }
00468
00469 void transformsChanged()
00470 {
00471 new_transforms_ = true;
00472
00473 ++transform_message_count_;
00474 }
00475
00476 void checkFailures()
00477 {
00478 if (next_failure_warning_.isZero())
00479 {
00480 next_failure_warning_ = ros::Time::now() + ros::Duration(15);
00481 }
00482
00483 if (ros::Time::now() >= next_failure_warning_)
00484 {
00485 if (incoming_message_count_ - message_count_ == 0)
00486 {
00487 return;
00488 }
00489
00490 double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00491 if (dropped_pct > 0.95)
00492 {
00493 TF_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00494 next_failure_warning_ = ros::Time::now() + ros::Duration(60);
00495
00496 if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00497 {
00498 TF_MESSAGEFILTER_WARN(" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00499 }
00500 }
00501 }
00502 }
00503
00504 void disconnectFailure(const message_filters::Connection& c)
00505 {
00506 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00507 c.getBoostConnection().disconnect();
00508 }
00509
00510 void signalFailure(const MEvent& evt, FilterFailureReason reason)
00511 {
00512 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00513 failure_signal_(evt.getMessage(), reason);
00514 }
00515
00516 Transformer& tf_;
00517 ros::NodeHandle nh_;
00518 ros::Duration max_rate_;
00519 ros::Timer max_rate_timer_;
00520 std::vector<std::string> target_frames_;
00521 std::string target_frames_string_;
00522 boost::mutex target_frames_string_mutex_;
00523 uint32_t queue_size_;
00524
00525 L_Event messages_;
00526 uint32_t message_count_;
00527 boost::mutex messages_mutex_;
00528
00529 bool new_messages_;
00530 volatile bool new_transforms_;
00531
00532 bool warned_about_unresolved_name_;
00533 bool warned_about_empty_frame_id_;
00534
00535 uint64_t successful_transform_count_;
00536 uint64_t failed_transform_count_;
00537 uint64_t failed_out_the_back_count_;
00538 uint64_t transform_message_count_;
00539 uint64_t incoming_message_count_;
00540 uint64_t dropped_message_count_;
00541
00542 ros::Time last_out_the_back_stamp_;
00543 std::string last_out_the_back_frame_;
00544
00545 ros::Time next_failure_warning_;
00546
00547 ros::Duration time_tolerance_;
00548
00549 boost::signals::connection tf_connection_;
00550 message_filters::Connection message_connection_;
00551
00552 FailureSignal failure_signal_;
00553 boost::mutex failure_signal_mutex_;
00554 };
00555
00556 }
00557
00558 #endif
00559