00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00032 #ifndef TF2_ROS_MESSAGE_FILTER_H
00033 #define TF2_ROS_MESSAGE_FILTER_H
00034
00035 #include <tf2/buffer_core.h>
00036
00037 #include <string>
00038 #include <list>
00039 #include <vector>
00040 #include <boost/function.hpp>
00041 #include <boost/bind.hpp>
00042 #include <boost/shared_ptr.hpp>
00043 #include <boost/thread.hpp>
00044 #include <boost/signals.hpp>
00045
00046 #include <message_filters/connection.h>
00047 #include <message_filters/simple_filter.h>
00048
00049 #include <ros/node_handle.h>
00050 #include <ros/callback_queue_interface.h>
00051 #include <ros/init.h>
00052
00053 #define TF2_ROS_MESSAGEFILTER_DEBUG(fmt, ...) \
00054 ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00055
00056 #define TF2_ROS_MESSAGEFILTER_WARN(fmt, ...) \
00057 ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00058
00059 namespace tf2_ros
00060 {
00061
00062 namespace filter_failure_reasons
00063 {
00064 enum FilterFailureReason
00065 {
00067 Unknown,
00069 OutTheBack,
00071 EmptyFrameID,
00072 };
00073 }
00074 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
00075
00076 class MessageFilterBase
00077 {
00078 public:
00079 typedef std::vector<std::string> V_string;
00080
00081 virtual ~MessageFilterBase(){}
00082 virtual void clear() = 0;
00083 virtual void setTargetFrame(const std::string& target_frame) = 0;
00084 virtual void setTargetFrames(const V_string& target_frames) = 0;
00085 virtual void setTolerance(const ros::Duration& tolerance) = 0;
00086 };
00087
00104 template<class M>
00105 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
00106 {
00107 public:
00108 typedef boost::shared_ptr<M const> MConstPtr;
00109 typedef ros::MessageEvent<M const> MEvent;
00110 typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
00111 typedef boost::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
00112
00113
00114 ROS_STATIC_ASSERT(ros::message_traits::HasHeader<M>::value);
00115
00124 MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
00125 : bc_(bc)
00126 , queue_size_(queue_size)
00127 , callback_queue_(nh.getCallbackQueue())
00128 {
00129 init();
00130
00131 setTargetFrame(target_frame);
00132 }
00133
00143 template<class F>
00144 MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
00145 : bc_(bc)
00146 , queue_size_(queue_size)
00147 , callback_queue_(nh.getCallbackQueue())
00148 {
00149 init();
00150
00151 setTargetFrame(target_frame);
00152
00153 connectInput(f);
00154 }
00155
00166 MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
00167 : bc_(bc)
00168 , queue_size_(queue_size)
00169 , callback_queue_(cbqueue)
00170 {
00171 init();
00172
00173 setTargetFrame(target_frame);
00174 }
00175
00187 template<class F>
00188 MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
00189 : bc_(bc)
00190 , queue_size_(queue_size)
00191 , callback_queue_(cbqueue)
00192 {
00193 init();
00194
00195 setTargetFrame(target_frame);
00196
00197 connectInput(f);
00198 }
00199
00203 template<class F>
00204 void connectInput(F& f)
00205 {
00206 message_connection_.disconnect();
00207 message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
00208 }
00209
00213 ~MessageFilter()
00214 {
00215 message_connection_.disconnect();
00216
00217 clear();
00218
00219 TF2_ROS_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00220 (long long unsigned int)successful_transform_count_,
00221 (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_,
00222 (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00223
00224 }
00225
00229 void setTargetFrame(const std::string& target_frame)
00230 {
00231 V_string frames;
00232 frames.push_back(target_frame);
00233 setTargetFrames(frames);
00234 }
00235
00239 void setTargetFrames(const V_string& target_frames)
00240 {
00241 boost::mutex::scoped_lock list_lock(messages_mutex_);
00242 boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
00243
00244 target_frames_ = target_frames;
00245 expected_success_count_ = target_frames_.size() + (time_tolerance_.isZero() ? 0 : 1);
00246
00247 std::stringstream ss;
00248 for (V_string::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00249 {
00250 ss << *it << " ";
00251 }
00252 target_frames_string_ = ss.str();
00253 }
00257 std::string getTargetFramesString()
00258 {
00259 boost::mutex::scoped_lock lock(target_frames_string_mutex_);
00260 return target_frames_string_;
00261 };
00262
00266 void setTolerance(const ros::Duration& tolerance)
00267 {
00268 boost::mutex::scoped_lock list_lock(messages_mutex_);
00269 time_tolerance_ = tolerance;
00270 expected_success_count_ = target_frames_.size() + (time_tolerance_.isZero() ? 0 : 1);
00271 }
00272
00276 void clear()
00277 {
00278 boost::mutex::scoped_lock lock(messages_mutex_);
00279
00280 TF2_ROS_MESSAGEFILTER_DEBUG("%s", "Cleared");
00281
00282 bc_.removeTransformableCallback(callback_handle_);
00283 callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
00284
00285 messages_.clear();
00286 message_count_ = 0;
00287
00288 warned_about_empty_frame_id_ = false;
00289 }
00290
00291 void add(const MEvent& evt)
00292 {
00293 if (target_frames_.empty())
00294 {
00295 return;
00296 }
00297
00298 namespace mt = ros::message_traits;
00299 const MConstPtr& message = evt.getMessage();
00300 const std::string& frame_id = *mt::FrameId<M>::pointer(*message);
00301 ros::Time stamp = mt::TimeStamp<M>::value(*message);
00302
00303 if (frame_id.empty())
00304 {
00305 messageDropped(evt, filter_failure_reasons::EmptyFrameID);
00306 return;
00307 }
00308
00309 boost::mutex::scoped_lock lock(messages_mutex_);
00310
00311 MessageInfo info;
00312 info.handles.reserve(expected_success_count_);
00313 {
00314 V_string::iterator it = target_frames_.begin();
00315 V_string::iterator end = target_frames_.end();
00316 for (; it != end; ++it)
00317 {
00318 const std::string& target_frame = *it;
00319 tf2::TransformableRequestHandle handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp);
00320 if (handle == 0xffffffffffffffffULL)
00321 {
00322 messageDropped(evt, filter_failure_reasons::OutTheBack);
00323 return;
00324 }
00325 else if (handle == 0)
00326 {
00327 ++info.success_count;
00328 }
00329 else
00330 {
00331 info.handles.push_back(handle);
00332 }
00333
00334 if (!time_tolerance_.isZero())
00335 {
00336 handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp + time_tolerance_);
00337 if (handle == 0xffffffffffffffffULL)
00338 {
00339 messageDropped(evt, filter_failure_reasons::OutTheBack);
00340 return;
00341 }
00342 else if (handle == 0)
00343 {
00344 ++info.success_count;
00345 }
00346 else
00347 {
00348 info.handles.push_back(handle);
00349 }
00350 }
00351 }
00352 }
00353
00354
00355 if (info.success_count == expected_success_count_)
00356 {
00357 messageReady(evt);
00358 }
00359 else
00360 {
00361
00362 if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00363 {
00364 ++dropped_message_count_;
00365 const MessageInfo& front = messages_.front();
00366 TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_,
00367 (*mt::FrameId<M>::pointer(*front.event.getMessage())).c_str(), mt::TimeStamp<M>::value(*front.event.getMessage()).toSec());
00368
00369 V_TransformableRequestHandle::const_iterator it = front.handles.begin();
00370 V_TransformableRequestHandle::const_iterator end = front.handles.end();
00371 for (; it != end; ++it)
00372 {
00373 bc_.cancelTransformableRequest(*it);
00374 }
00375
00376 messageDropped(front.event, filter_failure_reasons::Unknown);
00377
00378 messages_.pop_front();
00379 --message_count_;
00380 }
00381
00382
00383 info.event = evt;
00384 messages_.push_back(info);
00385 ++message_count_;
00386 }
00387
00388 TF2_ROS_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
00389
00390 ++incoming_message_count_;
00391 }
00392
00398 void add(const MConstPtr& message)
00399 {
00400 boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
00401 (*header)["callerid"] = "unknown";
00402 ros::WallTime n = ros::WallTime::now();
00403 ros::Time t(n.sec, n.nsec);
00404 add(MEvent(message, header, t));
00405 }
00406
00411 message_filters::Connection registerFailureCallback(const FailureCallback& callback)
00412 {
00413 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00414 return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
00415 }
00416
00417 virtual void setQueueSize( uint32_t new_queue_size )
00418 {
00419 queue_size_ = new_queue_size;
00420 }
00421
00422 virtual uint32_t getQueueSize()
00423 {
00424 return queue_size_;
00425 }
00426
00427
00428 private:
00429
00430 void init()
00431 {
00432 message_count_ = 0;
00433 successful_transform_count_ = 0;
00434 failed_out_the_back_count_ = 0;
00435 transform_message_count_ = 0;
00436 incoming_message_count_ = 0;
00437 dropped_message_count_ = 0;
00438 time_tolerance_ = ros::Duration(0.0);
00439 warned_about_empty_frame_id_ = false;
00440 expected_success_count_ = 1;
00441
00442 callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
00443 }
00444
00445 void transformable(tf2::TransformableRequestHandle request_handle, const std::string& target_frame, const std::string& source_frame,
00446 ros::Time time, tf2::TransformableResult result)
00447 {
00448 namespace mt = ros::message_traits;
00449
00450 boost::mutex::scoped_lock lock(messages_mutex_);
00451
00452
00453 typename L_MessageInfo::iterator msg_it = messages_.begin();
00454 typename L_MessageInfo::iterator msg_end = messages_.end();
00455 for (; msg_it != msg_end; ++msg_it)
00456 {
00457 MessageInfo& info = *msg_it;
00458 V_TransformableRequestHandle::const_iterator handle_it = std::find(info.handles.begin(), info.handles.end(), request_handle);
00459 if (handle_it != info.handles.end())
00460 {
00461
00462 ++info.success_count;
00463 break;
00464 }
00465 }
00466
00467 if (msg_it == msg_end)
00468 {
00469 return;
00470 }
00471
00472 const MessageInfo& info = *msg_it;
00473 if (info.success_count < expected_success_count_)
00474 {
00475 return;
00476 }
00477
00478 bool can_transform = true;
00479 const MConstPtr& message = info.event.getMessage();
00480 const std::string& frame_id = *mt::FrameId<M>::pointer(*message);
00481 ros::Time stamp = mt::TimeStamp<M>::value(*message);
00482
00483 if (result == tf2::TransformAvailable)
00484 {
00485
00486 typename V_string::iterator it = target_frames_.begin();
00487 typename V_string::iterator end = target_frames_.end();
00488 for (; it != end; ++it)
00489 {
00490 const std::string& target = *it;
00491 if (!bc_.canTransform(target, frame_id, stamp))
00492 {
00493 can_transform = false;
00494 break;
00495 }
00496
00497 if (!time_tolerance_.isZero())
00498 {
00499 if (!bc_.canTransform(target, frame_id, stamp + time_tolerance_))
00500 {
00501 can_transform = false;
00502 break;
00503 }
00504 }
00505 }
00506 }
00507 else
00508 {
00509 can_transform = false;
00510 }
00511
00512 if (can_transform)
00513 {
00514 TF2_ROS_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
00515
00516 ++successful_transform_count_;
00517
00518 messageReady(info.event);
00519
00520 }
00521 else
00522 {
00523 ++dropped_message_count_;
00524
00525 TF2_ROS_MESSAGEFILTER_DEBUG("Discarding message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
00526 messageDropped(info.event, filter_failure_reasons::Unknown);
00527 }
00528
00529 messages_.erase(msg_it);
00530 --message_count_;
00531 }
00532
00536 void incomingMessage(const ros::MessageEvent<M const>& evt)
00537 {
00538 add(evt);
00539 }
00540
00541 void checkFailures()
00542 {
00543 if (next_failure_warning_.isZero())
00544 {
00545 next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(15);
00546 }
00547
00548 if (ros::WallTime::now() >= next_failure_warning_)
00549 {
00550 if (incoming_message_count_ - message_count_ == 0)
00551 {
00552 return;
00553 }
00554
00555 double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00556 if (dropped_pct > 0.95)
00557 {
00558 TF2_ROS_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00559 next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(60);
00560
00561 if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00562 {
00563 TF2_ROS_MESSAGEFILTER_WARN(" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00564 }
00565 }
00566 }
00567 }
00568
00569 struct CBQueueCallback : public ros::CallbackInterface
00570 {
00571 CBQueueCallback(MessageFilter* filter, const MEvent& event, bool success, FilterFailureReason reason)
00572 : event_(event)
00573 , filter_(filter)
00574 , reason_(reason)
00575 , success_(success)
00576 {}
00577
00578
00579 virtual CallResult call()
00580 {
00581 if (success_)
00582 {
00583 filter_->signalMessage(event_);
00584 }
00585 else
00586 {
00587 filter_->signalFailure(event_, reason_);
00588 }
00589
00590 return Success;
00591 }
00592
00593 private:
00594 MEvent event_;
00595 MessageFilter* filter_;
00596 FilterFailureReason reason_;
00597 bool success_;
00598 };
00599
00600 void messageDropped(const MEvent& evt, FilterFailureReason reason)
00601 {
00602 if (callback_queue_)
00603 {
00604 ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, false, reason));
00605 callback_queue_->addCallback(cb, (uint64_t)this);
00606 }
00607 else
00608 {
00609 signalFailure(evt, reason);
00610 }
00611 }
00612
00613 void messageReady(const MEvent& evt)
00614 {
00615 if (callback_queue_)
00616 {
00617 ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, true, filter_failure_reasons::Unknown));
00618 callback_queue_->addCallback(cb, (uint64_t)this);
00619 }
00620 else
00621 {
00622 signalMessage(evt);
00623 }
00624 }
00625
00626 void disconnectFailure(const message_filters::Connection& c)
00627 {
00628 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00629 c.getBoostConnection().disconnect();
00630 }
00631
00632 void signalFailure(const MEvent& evt, FilterFailureReason reason)
00633 {
00634 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00635 failure_signal_(evt.getMessage(), reason);
00636 }
00637
00638 tf2::BufferCore& bc_;
00639 V_string target_frames_;
00640 std::string target_frames_string_;
00641 boost::mutex target_frames_string_mutex_;
00642 uint32_t queue_size_;
00643 tf2::TransformableCallbackHandle callback_handle_;
00644
00645 typedef std::vector<tf2::TransformableRequestHandle> V_TransformableRequestHandle;
00646 struct MessageInfo
00647 {
00648 MessageInfo()
00649 : success_count(0)
00650 {}
00651
00652 MEvent event;
00653 V_TransformableRequestHandle handles;
00654 uint32_t success_count;
00655 };
00656 typedef std::list<MessageInfo> L_MessageInfo;
00657 L_MessageInfo messages_;
00658 uint32_t message_count_;
00659 boost::mutex messages_mutex_;
00660 uint32_t expected_success_count_;
00661
00662 bool warned_about_empty_frame_id_;
00663
00664 uint64_t successful_transform_count_;
00665 uint64_t failed_out_the_back_count_;
00666 uint64_t transform_message_count_;
00667 uint64_t incoming_message_count_;
00668 uint64_t dropped_message_count_;
00669
00670 ros::Time last_out_the_back_stamp_;
00671 std::string last_out_the_back_frame_;
00672
00673 ros::WallTime next_failure_warning_;
00674
00675 ros::Duration time_tolerance_;
00676
00677 message_filters::Connection message_connection_;
00678
00679 FailureSignal failure_signal_;
00680 boost::mutex failure_signal_mutex_;
00681
00682 ros::CallbackQueueInterface* callback_queue_;
00683 };
00684
00685 }
00686
00687 #endif
00688