00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00032 #ifndef TF_MESSAGE_FILTER_H
00033 #define TF_MESSAGE_FILTER_H
00034
00035 #include <ros/ros.h>
00036 #include <tf/transform_listener.h>
00037 #include <tf/tfMessage.h>
00038
00039 #include <string>
00040 #include <list>
00041 #include <vector>
00042 #include <boost/function.hpp>
00043 #include <boost/bind.hpp>
00044 #include <boost/shared_ptr.hpp>
00045 #include <boost/weak_ptr.hpp>
00046 #include <boost/thread.hpp>
00047 #include <boost/signals.hpp>
00048
00049 #include <ros/callback_queue.h>
00050
00051 #include <message_filters/connection.h>
00052 #include <message_filters/simple_filter.h>
00053
00054 #define TF_MESSAGEFILTER_DEBUG(fmt, ...) \
00055 ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00056
00057 #define TF_MESSAGEFILTER_WARN(fmt, ...) \
00058 ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00059
00060 namespace tf
00061 {
00062
00063 namespace filter_failure_reasons
00064 {
00065 enum FilterFailureReason
00066 {
00068 Unknown,
00070 OutTheBack,
00072 EmptyFrameID,
00073 };
00074 }
00075 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
00076
00077 class MessageFilterBase
00078 {
00079 public:
00080 virtual ~MessageFilterBase(){}
00081 virtual void clear() = 0;
00082 virtual void setTargetFrame(const std::string& target_frame) = 0;
00083 virtual void setTargetFrames(const std::vector<std::string>& target_frames) = 0;
00084 virtual void setTolerance(const ros::Duration& tolerance) = 0;
00085 };
00086
00103 template<class M>
00104 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
00105 {
00106 public:
00107 typedef boost::shared_ptr<M const> MConstPtr;
00108 typedef ros::MessageEvent<M const> MEvent;
00109 typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
00110 typedef boost::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
00111
00112
00113 ROS_STATIC_ASSERT(ros::message_traits::HasHeader<M>::value);
00114
00124 MessageFilter(Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00125 : tf_(tf)
00126 , nh_(nh)
00127 , max_rate_(max_rate)
00128 , queue_size_(queue_size)
00129 {
00130 init();
00131
00132 setTargetFrame(target_frame);
00133 }
00134
00145 template<class F>
00146 MessageFilter(F& f, Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00147 : tf_(tf)
00148 , nh_(nh)
00149 , max_rate_(max_rate)
00150 , queue_size_(queue_size)
00151 {
00152 init();
00153
00154 setTargetFrame(target_frame);
00155
00156 connectInput(f);
00157 }
00158
00162 template<class F>
00163 void connectInput(F& f)
00164 {
00165 message_connection_.disconnect();
00166 message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
00167 }
00168
00172 ~MessageFilter()
00173 {
00174 message_connection_.disconnect();
00175 tf_.removeTransformsChangedListener(tf_connection_);
00176
00177 clear();
00178
00179 TF_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00180 (long long unsigned int)successful_transform_count_, (long long unsigned int)failed_transform_count_,
00181 (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_,
00182 (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00183
00184 }
00185
00189 void setTargetFrame(const std::string& target_frame)
00190 {
00191 std::vector<std::string> frames;
00192 frames.push_back(target_frame);
00193 setTargetFrames(frames);
00194 }
00195
00199 void setTargetFrames(const std::vector<std::string>& target_frames)
00200 {
00201 boost::mutex::scoped_lock list_lock(messages_mutex_);
00202 boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
00203
00204 target_frames_ = target_frames;
00205
00206 std::stringstream ss;
00207 for (std::vector<std::string>::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00208 {
00209 *it = tf::resolve(tf_.getTFPrefix(), *it);
00210 ss << *it << " ";
00211 }
00212 target_frames_string_ = ss.str();
00213 }
00217 std::string getTargetFramesString()
00218 {
00219 boost::mutex::scoped_lock lock(target_frames_string_mutex_);
00220 return target_frames_string_;
00221 };
00222
00226 void setTolerance(const ros::Duration& tolerance)
00227 {
00228 time_tolerance_ = tolerance;
00229 }
00230
00234 void clear()
00235 {
00236 boost::mutex::scoped_lock lock(messages_mutex_);
00237
00238 TF_MESSAGEFILTER_DEBUG("%s", "Cleared");
00239
00240 messages_.clear();
00241 message_count_ = 0;
00242
00243 warned_about_unresolved_name_ = false;
00244 warned_about_empty_frame_id_ = false;
00245 }
00246
00247 void add(const MEvent& evt)
00248 {
00249 boost::mutex::scoped_lock lock(messages_mutex_);
00250
00251 testMessages();
00252
00253 if (!testMessage(evt))
00254 {
00255
00256 if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00257 {
00258 ++dropped_message_count_;
00259 const MEvent& front = messages_.front();
00260 TF_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, front.getMessage()->header.frame_id.c_str(), front.getMessage()->header.stamp.toSec());
00261 signalFailure(messages_.front(), filter_failure_reasons::Unknown);
00262
00263 messages_.pop_front();
00264 --message_count_;
00265 }
00266
00267
00268 messages_.push_back(evt);
00269 ++message_count_;
00270 }
00271
00272 TF_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", evt.getMessage()->header.frame_id.c_str(), evt.getMessage()->header.stamp.toSec(), message_count_);
00273
00274 ++incoming_message_count_;
00275 }
00276
00282 void add(const MConstPtr& message)
00283 {
00284 boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
00285 (*header)["callerid"] = "unknown";
00286 add(MEvent(message, header, ros::Time::now()));
00287 }
00288
00293 message_filters::Connection registerFailureCallback(const FailureCallback& callback)
00294 {
00295 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00296 return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
00297 }
00298
00299 private:
00300
00301 void init()
00302 {
00303 message_count_ = 0;
00304 new_transforms_ = false;
00305 successful_transform_count_ = 0;
00306 failed_transform_count_ = 0;
00307 failed_out_the_back_count_ = 0;
00308 transform_message_count_ = 0;
00309 incoming_message_count_ = 0;
00310 dropped_message_count_ = 0;
00311 time_tolerance_ = ros::Duration(0.0);
00312 warned_about_unresolved_name_ = false;
00313 warned_about_empty_frame_id_ = false;
00314
00315 tf_connection_ = tf_.addTransformsChangedListener(boost::bind(&MessageFilter::transformsChanged, this));
00316
00317 max_rate_timer_ = nh_.createTimer(max_rate_, &MessageFilter::maxRateTimerCallback, this);
00318 }
00319
00320 typedef std::list<MEvent> L_Event;
00321
00322 bool testMessage(const MEvent& evt)
00323 {
00324 const MConstPtr& message = evt.getMessage();
00325 std::string callerid = evt.getPublisherName();
00326 std::string frame_id = ros::message_traits::FrameId<M>::value(*message);
00327 ros::Time stamp = ros::message_traits::TimeStamp<M>::value(*message);
00328
00329
00330 if (frame_id.empty())
00331 {
00332 if (!warned_about_empty_frame_id_)
00333 {
00334 warned_about_empty_frame_id_ = true;
00335 TF_MESSAGEFILTER_WARN("Discarding message from [%s] due to empty frame_id. This message will only print once.", callerid.c_str());
00336 }
00337 signalFailure(evt, filter_failure_reasons::EmptyFrameID);
00338 return true;
00339 }
00340
00341 if (frame_id[0] != '/')
00342 {
00343 std::string unresolved = frame_id;
00344 frame_id = tf::resolve(tf_.getTFPrefix(), frame_id);
00345
00346 if (!warned_about_unresolved_name_)
00347 {
00348 warned_about_unresolved_name_ = true;
00349 ROS_WARN("Message from [%s] has a non-fully-qualified frame_id [%s]. Resolved locally to [%s]. This is will likely not work in multi-robot systems. This message will only print once.", callerid.c_str(), unresolved.c_str(), frame_id.c_str());
00350 }
00351 }
00352
00353
00355 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it)
00356 {
00357 const std::string& target_frame = *target_it;
00358
00359 if (target_frame != frame_id && stamp != ros::Time(0))
00360 {
00361 ros::Time latest_transform_time ;
00362
00363 tf_.getLatestCommonTime(frame_id, target_frame, latest_transform_time, 0) ;
00364 if (stamp + tf_.getCacheLength() < latest_transform_time)
00365 {
00366 ++failed_out_the_back_count_;
00367 ++dropped_message_count_;
00368 TF_MESSAGEFILTER_DEBUG("Discarding Message, in frame %s, Out of the back of Cache Time(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f. Message Count now: %d", message->header.frame_id.c_str(), message->header.stamp.toSec(), tf_.getCacheLength().toSec(), latest_transform_time.toSec(), message_count_);
00369
00370 last_out_the_back_stamp_ = stamp;
00371 last_out_the_back_frame_ = frame_id;
00372
00373 signalFailure(evt, filter_failure_reasons::OutTheBack);
00374 return true;
00375 }
00376 }
00377
00378 }
00379
00380 bool ready = !target_frames_.empty();
00381 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it)
00382 {
00383 std::string& target_frame = *target_it;
00384 if (time_tolerance_ != ros::Duration(0.0))
00385 {
00386 ready = ready && (tf_.canTransform(target_frame, frame_id, stamp) &&
00387 tf_.canTransform(target_frame, frame_id, stamp + time_tolerance_) );
00388 }
00389 else
00390 {
00391 ready = ready && tf_.canTransform(target_frame, frame_id, stamp);
00392 }
00393 }
00394
00395 if (ready)
00396 {
00397 TF_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
00398
00399 ++successful_transform_count_;
00400
00401 signalMessage(evt);
00402 }
00403 else
00404 {
00405 ++failed_transform_count_;
00406 }
00407
00408 return ready;
00409 }
00410
00411 void testMessages()
00412 {
00413 if (!messages_.empty() && getTargetFramesString() == " ")
00414 {
00415 ROS_WARN_NAMED("message_notifier", "MessageFilter [target=%s]: empty target frame", getTargetFramesString().c_str());
00416 }
00417
00418 int i = 0;
00419
00420 typename L_Event::iterator it = messages_.begin();
00421 for (; it != messages_.end(); ++i)
00422 {
00423 MEvent& evt = *it;
00424
00425 if (testMessage(evt))
00426 {
00427 --message_count_;
00428 it = messages_.erase(it);
00429 }
00430 else
00431 {
00432 ++it;
00433 }
00434 }
00435 }
00436
00437 void maxRateTimerCallback(const ros::TimerEvent&)
00438 {
00439 boost::mutex::scoped_lock list_lock(messages_mutex_);
00440 if (new_transforms_)
00441 {
00442 testMessages();
00443 new_transforms_ = false;
00444 }
00445
00446 checkFailures();
00447 }
00448
00452 void incomingMessage(const ros::MessageEvent<M const>& evt)
00453 {
00454 add(evt);
00455 }
00456
00457 void transformsChanged()
00458 {
00459 new_transforms_ = true;
00460
00461 ++transform_message_count_;
00462 }
00463
00464 void checkFailures()
00465 {
00466 if (next_failure_warning_.isZero())
00467 {
00468 next_failure_warning_ = ros::Time::now() + ros::Duration(15);
00469 }
00470
00471 if (ros::Time::now() >= next_failure_warning_)
00472 {
00473 if (incoming_message_count_ - message_count_ == 0)
00474 {
00475 return;
00476 }
00477
00478 double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00479 if (dropped_pct > 0.95)
00480 {
00481 TF_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00482 next_failure_warning_ = ros::Time::now() + ros::Duration(60);
00483
00484 if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00485 {
00486 TF_MESSAGEFILTER_WARN(" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00487 }
00488 }
00489 }
00490 }
00491
00492 void disconnectFailure(const message_filters::Connection& c)
00493 {
00494 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00495 c.getBoostConnection().disconnect();
00496 }
00497
00498 void signalFailure(const MEvent& evt, FilterFailureReason reason)
00499 {
00500 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00501 failure_signal_(evt.getMessage(), reason);
00502 }
00503
00504 Transformer& tf_;
00505 ros::NodeHandle nh_;
00506 ros::Duration max_rate_;
00507 ros::Timer max_rate_timer_;
00508 std::vector<std::string> target_frames_;
00509 std::string target_frames_string_;
00510 boost::mutex target_frames_string_mutex_;
00511 uint32_t queue_size_;
00512
00513 L_Event messages_;
00514 uint32_t message_count_;
00515 boost::mutex messages_mutex_;
00516
00517 bool new_messages_;
00518 volatile bool new_transforms_;
00519
00520 bool warned_about_unresolved_name_;
00521 bool warned_about_empty_frame_id_;
00522
00523 uint64_t successful_transform_count_;
00524 uint64_t failed_transform_count_;
00525 uint64_t failed_out_the_back_count_;
00526 uint64_t transform_message_count_;
00527 uint64_t incoming_message_count_;
00528 uint64_t dropped_message_count_;
00529
00530 ros::Time last_out_the_back_stamp_;
00531 std::string last_out_the_back_frame_;
00532
00533 ros::Time next_failure_warning_;
00534
00535 ros::Duration time_tolerance_;
00536
00537 boost::signals::connection tf_connection_;
00538 message_filters::Connection message_connection_;
00539
00540 FailureSignal failure_signal_;
00541 boost::mutex failure_signal_mutex_;
00542 };
00543
00544 }
00545
00546 #endif
00547