00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00032 #ifndef TF_MESSAGE_FILTER_H
00033 #define TF_MESSAGE_FILTER_H
00034
00035 #include <ros/ros.h>
00036 #include <tf/transform_listener.h>
00037 #include <tf/tfMessage.h>
00038
00039 #include <string>
00040 #include <list>
00041 #include <vector>
00042 #include <boost/function.hpp>
00043 #include <boost/bind.hpp>
00044 #include <boost/shared_ptr.hpp>
00045 #include <boost/weak_ptr.hpp>
00046 #include <boost/thread.hpp>
00047 #include <boost/signals.hpp>
00048
00049 #include <ros/callback_queue.h>
00050
00051 #include <message_filters/connection.h>
00052 #include <message_filters/simple_filter.h>
00053
00054 #define TF_MESSAGEFILTER_DEBUG(fmt, ...) \
00055 ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00056
00057 #define TF_MESSAGEFILTER_WARN(fmt, ...) \
00058 ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00059
00060 namespace tf
00061 {
00062
00063 namespace filter_failure_reasons
00064 {
00065 enum FilterFailureReason
00066 {
00068 Unknown,
00070 OutTheBack,
00072 EmptyFrameID,
00073 };
00074 }
00075 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
00076
00077 class MessageFilterBase
00078 {
00079 public:
00080 virtual ~MessageFilterBase(){}
00081 virtual void clear() = 0;
00082 virtual void setTargetFrame(const std::string& target_frame) = 0;
00083 virtual void setTargetFrames(const std::vector<std::string>& target_frames) = 0;
00084 virtual void setTolerance(const ros::Duration& tolerance) = 0;
00085 virtual void setQueueSize( uint32_t new_queue_size ) = 0;
00086 virtual uint32_t getQueueSize() = 0;
00087 };
00088
00105 template<class M>
00106 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
00107 {
00108 public:
00109 typedef boost::shared_ptr<M const> MConstPtr;
00110 typedef ros::MessageEvent<M const> MEvent;
00111 typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
00112 typedef boost::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
00113
00114
00115 ROS_STATIC_ASSERT(ros::message_traits::HasHeader<M>::value);
00116
00126 MessageFilter(Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00127 : tf_(tf)
00128 , nh_(nh)
00129 , max_rate_(max_rate)
00130 , queue_size_(queue_size)
00131 {
00132 init();
00133
00134 setTargetFrame(target_frame);
00135 }
00136
00147 template<class F>
00148 MessageFilter(F& f, Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00149 : tf_(tf)
00150 , nh_(nh)
00151 , max_rate_(max_rate)
00152 , queue_size_(queue_size)
00153 {
00154 init();
00155
00156 setTargetFrame(target_frame);
00157
00158 connectInput(f);
00159 }
00160
00164 template<class F>
00165 void connectInput(F& f)
00166 {
00167 message_connection_.disconnect();
00168 message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
00169 }
00170
00174 ~MessageFilter()
00175 {
00176
00177 max_rate_timer_.stop();
00178 message_connection_.disconnect();
00179 tf_.removeTransformsChangedListener(tf_connection_);
00180
00181 clear();
00182
00183 TF_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00184 (long long unsigned int)successful_transform_count_, (long long unsigned int)failed_transform_count_,
00185 (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_,
00186 (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00187
00188 }
00189
00193 void setTargetFrame(const std::string& target_frame)
00194 {
00195 std::vector<std::string> frames;
00196 frames.push_back(target_frame);
00197 setTargetFrames(frames);
00198 }
00199
00203 void setTargetFrames(const std::vector<std::string>& target_frames)
00204 {
00205 boost::mutex::scoped_lock list_lock(messages_mutex_);
00206 boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
00207
00208 target_frames_ = target_frames;
00209
00210 std::stringstream ss;
00211 for (std::vector<std::string>::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00212 {
00213 ss << *it << " ";
00214 }
00215 target_frames_string_ = ss.str();
00216 }
00220 std::string getTargetFramesString()
00221 {
00222 boost::mutex::scoped_lock lock(target_frames_string_mutex_);
00223 return target_frames_string_;
00224 };
00225
00229 void setTolerance(const ros::Duration& tolerance)
00230 {
00231 time_tolerance_ = tolerance;
00232 }
00233
00237 void clear()
00238 {
00239 boost::mutex::scoped_lock lock(messages_mutex_);
00240
00241 TF_MESSAGEFILTER_DEBUG("%s", "Cleared");
00242
00243 messages_.clear();
00244 message_count_ = 0;
00245
00246 warned_about_unresolved_name_ = false;
00247 warned_about_empty_frame_id_ = false;
00248 }
00249
00250 void add(const MEvent& evt)
00251 {
00252 boost::mutex::scoped_lock lock(messages_mutex_);
00253
00254 testMessages();
00255
00256 if (!testMessage(evt))
00257 {
00258
00259 if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00260 {
00261 ++dropped_message_count_;
00262 const MEvent& front = messages_.front();
00263 TF_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, front.getMessage()->header.frame_id.c_str(), front.getMessage()->header.stamp.toSec());
00264 signalFailure(messages_.front(), filter_failure_reasons::Unknown);
00265
00266 messages_.pop_front();
00267 --message_count_;
00268 }
00269
00270
00271 messages_.push_back(evt);
00272 ++message_count_;
00273 }
00274
00275 TF_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", evt.getMessage()->header.frame_id.c_str(), evt.getMessage()->header.stamp.toSec(), message_count_);
00276
00277 ++incoming_message_count_;
00278 }
00279
00285 void add(const MConstPtr& message)
00286 {
00287
00288 boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
00289 (*header)["callerid"] = "unknown";
00290 add(MEvent(message, header, ros::Time::now()));
00291 }
00292
00297 message_filters::Connection registerFailureCallback(const FailureCallback& callback)
00298 {
00299 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00300 return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
00301 }
00302
00303 virtual void setQueueSize( uint32_t new_queue_size )
00304 {
00305 queue_size_ = new_queue_size;
00306 }
00307
00308 virtual uint32_t getQueueSize()
00309 {
00310 return queue_size_;
00311 }
00312
00313 private:
00314
00315 void init()
00316 {
00317 message_count_ = 0;
00318 new_transforms_ = false;
00319 successful_transform_count_ = 0;
00320 failed_transform_count_ = 0;
00321 failed_out_the_back_count_ = 0;
00322 transform_message_count_ = 0;
00323 incoming_message_count_ = 0;
00324 dropped_message_count_ = 0;
00325 time_tolerance_ = ros::Duration(0.0);
00326 warned_about_unresolved_name_ = false;
00327 warned_about_empty_frame_id_ = false;
00328
00329 tf_connection_ = tf_.addTransformsChangedListener(boost::bind(&MessageFilter::transformsChanged, this));
00330
00331 max_rate_timer_ = nh_.createTimer(max_rate_, &MessageFilter::maxRateTimerCallback, this);
00332 }
00333
00334 typedef std::list<MEvent> L_Event;
00335
00336 bool testMessage(const MEvent& evt)
00337 {
00338 const MConstPtr& message = evt.getMessage();
00339 std::string callerid = evt.getPublisherName();
00340 std::string frame_id = ros::message_traits::FrameId<M>::value(*message);
00341 ros::Time stamp = ros::message_traits::TimeStamp<M>::value(*message);
00342
00343
00344 if (frame_id.empty())
00345 {
00346 if (!warned_about_empty_frame_id_)
00347 {
00348 warned_about_empty_frame_id_ = true;
00349 TF_MESSAGEFILTER_WARN("Discarding message from [%s] due to empty frame_id. This message will only print once.", callerid.c_str());
00350 }
00351 signalFailure(evt, filter_failure_reasons::EmptyFrameID);
00352 return true;
00353 }
00354
00355
00356
00358 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it)
00359 {
00360 const std::string& target_frame = *target_it;
00361
00362 if (target_frame != frame_id && stamp != ros::Time(0))
00363 {
00364 ros::Time latest_transform_time ;
00365
00366 tf_.getLatestCommonTime(frame_id, target_frame, latest_transform_time, 0) ;
00367
00368 if (stamp + tf_.getCacheLength() < latest_transform_time)
00369 {
00370 ++failed_out_the_back_count_;
00371 ++dropped_message_count_;
00372 TF_MESSAGEFILTER_DEBUG("Discarding Message, in frame %s, Out of the back of Cache Time(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f. Message Count now: %d", message->header.frame_id.c_str(), message->header.stamp.toSec(), tf_.getCacheLength().toSec(), latest_transform_time.toSec(), message_count_);
00373
00374 last_out_the_back_stamp_ = stamp;
00375 last_out_the_back_frame_ = frame_id;
00376
00377 signalFailure(evt, filter_failure_reasons::OutTheBack);
00378 return true;
00379 }
00380 }
00381
00382 }
00383
00384 bool ready = !target_frames_.empty();
00385 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it)
00386 {
00387 std::string& target_frame = *target_it;
00388 if (time_tolerance_ != ros::Duration(0.0))
00389 {
00390 ready = ready && (tf_.canTransform(target_frame, frame_id, stamp) &&
00391 tf_.canTransform(target_frame, frame_id, stamp + time_tolerance_) );
00392 }
00393 else
00394 {
00395 ready = ready && tf_.canTransform(target_frame, frame_id, stamp);
00396 }
00397 }
00398
00399 if (ready)
00400 {
00401 TF_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
00402
00403 ++successful_transform_count_;
00404
00405 this->signalMessage(evt);
00406 }
00407 else
00408 {
00409 ++failed_transform_count_;
00410 }
00411
00412 return ready;
00413 }
00414
00415 void testMessages()
00416 {
00417 if (!messages_.empty() && getTargetFramesString() == " ")
00418 {
00419 ROS_WARN_NAMED("message_notifier", "MessageFilter [target=%s]: empty target frame", getTargetFramesString().c_str());
00420 }
00421
00422 int i = 0;
00423
00424 typename L_Event::iterator it = messages_.begin();
00425 for (; it != messages_.end(); ++i)
00426 {
00427 MEvent& evt = *it;
00428
00429 if (testMessage(evt))
00430 {
00431 --message_count_;
00432 it = messages_.erase(it);
00433 }
00434 else
00435 {
00436 ++it;
00437 }
00438 }
00439 }
00440
00441 void maxRateTimerCallback(const ros::TimerEvent&)
00442 {
00443 boost::mutex::scoped_lock list_lock(messages_mutex_);
00444 if (new_transforms_)
00445 {
00446 testMessages();
00447 new_transforms_ = false;
00448 }
00449
00450 checkFailures();
00451 }
00452
00456 void incomingMessage(const ros::MessageEvent<M const>& evt)
00457 {
00458 add(evt);
00459 }
00460
00461 void transformsChanged()
00462 {
00463 new_transforms_ = true;
00464
00465 ++transform_message_count_;
00466 }
00467
00468 void checkFailures()
00469 {
00470 if (next_failure_warning_.isZero())
00471 {
00472 next_failure_warning_ = ros::Time::now() + ros::Duration(15);
00473 }
00474
00475 if (ros::Time::now() >= next_failure_warning_)
00476 {
00477 if (incoming_message_count_ - message_count_ == 0)
00478 {
00479 return;
00480 }
00481
00482 double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00483 if (dropped_pct > 0.95)
00484 {
00485 TF_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00486 next_failure_warning_ = ros::Time::now() + ros::Duration(60);
00487
00488 if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00489 {
00490 TF_MESSAGEFILTER_WARN(" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00491 }
00492 }
00493 }
00494 }
00495
00496 void disconnectFailure(const message_filters::Connection& c)
00497 {
00498 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00499 c.getBoostConnection().disconnect();
00500 }
00501
00502 void signalFailure(const MEvent& evt, FilterFailureReason reason)
00503 {
00504 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00505 failure_signal_(evt.getMessage(), reason);
00506 }
00507
00508 Transformer& tf_;
00509 ros::NodeHandle nh_;
00510 ros::Duration max_rate_;
00511 ros::Timer max_rate_timer_;
00512 std::vector<std::string> target_frames_;
00513 std::string target_frames_string_;
00514 boost::mutex target_frames_string_mutex_;
00515 uint32_t queue_size_;
00516
00517 L_Event messages_;
00518 uint32_t message_count_;
00519 boost::mutex messages_mutex_;
00520
00521 bool new_messages_;
00522 volatile bool new_transforms_;
00523
00524 bool warned_about_unresolved_name_;
00525 bool warned_about_empty_frame_id_;
00526
00527 uint64_t successful_transform_count_;
00528 uint64_t failed_transform_count_;
00529 uint64_t failed_out_the_back_count_;
00530 uint64_t transform_message_count_;
00531 uint64_t incoming_message_count_;
00532 uint64_t dropped_message_count_;
00533
00534 ros::Time last_out_the_back_stamp_;
00535 std::string last_out_the_back_frame_;
00536
00537 ros::Time next_failure_warning_;
00538
00539 ros::Duration time_tolerance_;
00540
00541 boost::signals::connection tf_connection_;
00542 message_filters::Connection message_connection_;
00543
00544 FailureSignal failure_signal_;
00545 boost::mutex failure_signal_mutex_;
00546 };
00547
00548
00549 }
00550
00551 #endif
00552