00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00032 #ifndef TF2_ROS_MESSAGE_FILTER_H
00033 #define TF2_ROS_MESSAGE_FILTER_H
00034
00035 #include <tf2/buffer_core.h>
00036
00037 #include <string>
00038 #include <list>
00039 #include <vector>
00040 #include <boost/function.hpp>
00041 #include <boost/bind.hpp>
00042 #include <boost/shared_ptr.hpp>
00043 #include <boost/thread.hpp>
00044 #include <boost/signals.hpp>
00045
00046 #include <message_filters/connection.h>
00047 #include <message_filters/simple_filter.h>
00048
00049 #include <ros/node_handle.h>
00050 #include <ros/callback_queue_interface.h>
00051 #include <ros/init.h>
00052
00053 #define TF2_ROS_MESSAGEFILTER_DEBUG(fmt, ...) \
00054 ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00055
00056 #define TF2_ROS_MESSAGEFILTER_WARN(fmt, ...) \
00057 ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00058
00059 namespace tf2_ros
00060 {
00061
00062 namespace filter_failure_reasons
00063 {
00064 enum FilterFailureReason
00065 {
00067 Unknown,
00069 OutTheBack,
00071 EmptyFrameID,
00072 };
00073 }
00074 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
00075
00076 class MessageFilterBase
00077 {
00078 public:
00079 typedef std::vector<std::string> V_string;
00080
00081 virtual ~MessageFilterBase(){}
00082 virtual void clear() = 0;
00083 virtual void setTargetFrame(const std::string& target_frame) = 0;
00084 virtual void setTargetFrames(const V_string& target_frames) = 0;
00085 virtual void setTolerance(const ros::Duration& tolerance) = 0;
00086 };
00087
00104 template<class M>
00105 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
00106 {
00107 public:
00108 typedef boost::shared_ptr<M const> MConstPtr;
00109 typedef ros::MessageEvent<M const> MEvent;
00110 typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
00111 typedef boost::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
00112
00113
00114 ROS_STATIC_ASSERT(ros::message_traits::HasHeader<M>::value);
00115
00124 MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
00125 : bc_(bc)
00126 , queue_size_(queue_size)
00127 , callback_queue_(nh.getCallbackQueue())
00128 {
00129 init();
00130
00131 setTargetFrame(target_frame);
00132 }
00133
00143 template<class F>
00144 MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
00145 : bc_(bc)
00146 , queue_size_(queue_size)
00147 , callback_queue_(nh.getCallbackQueue())
00148 {
00149 init();
00150
00151 setTargetFrame(target_frame);
00152
00153 connectInput(f);
00154 }
00155
00166 MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
00167 : bc_(bc)
00168 , queue_size_(queue_size)
00169 , callback_queue_(cbqueue)
00170 {
00171 init();
00172
00173 setTargetFrame(target_frame);
00174 }
00175
00187 template<class F>
00188 MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
00189 : bc_(bc)
00190 , queue_size_(queue_size)
00191 , callback_queue_(cbqueue)
00192 {
00193 init();
00194
00195 setTargetFrame(target_frame);
00196
00197 connectInput(f);
00198 }
00199
00203 template<class F>
00204 void connectInput(F& f)
00205 {
00206 message_connection_.disconnect();
00207 message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
00208 }
00209
00213 ~MessageFilter()
00214 {
00215 message_connection_.disconnect();
00216
00217 clear();
00218
00219 TF2_ROS_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00220 (long long unsigned int)successful_transform_count_,
00221 (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_,
00222 (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00223
00224 }
00225
00229 void setTargetFrame(const std::string& target_frame)
00230 {
00231 V_string frames;
00232 frames.push_back(target_frame);
00233 setTargetFrames(frames);
00234 }
00235
00239 void setTargetFrames(const V_string& target_frames)
00240 {
00241 boost::mutex::scoped_lock list_lock(messages_mutex_);
00242 boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
00243
00244 target_frames_ = target_frames;
00245 expected_success_count_ = target_frames_.size() + (time_tolerance_.isZero() ? 0 : 1);
00246
00247 std::stringstream ss;
00248 for (V_string::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00249 {
00250 ss << *it << " ";
00251 }
00252 target_frames_string_ = ss.str();
00253 }
00257 std::string getTargetFramesString()
00258 {
00259 boost::mutex::scoped_lock lock(target_frames_string_mutex_);
00260 return target_frames_string_;
00261 };
00262
00266 void setTolerance(const ros::Duration& tolerance)
00267 {
00268 boost::mutex::scoped_lock list_lock(messages_mutex_);
00269 time_tolerance_ = tolerance;
00270 expected_success_count_ = target_frames_.size() + (time_tolerance_.isZero() ? 0 : 1);
00271 }
00272
00276 void clear()
00277 {
00278 boost::mutex::scoped_lock lock(messages_mutex_);
00279
00280 TF2_ROS_MESSAGEFILTER_DEBUG("%s", "Cleared");
00281
00282 bc_.removeTransformableCallback(callback_handle_);
00283 callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
00284
00285 messages_.clear();
00286 message_count_ = 0;
00287
00288 warned_about_empty_frame_id_ = false;
00289 }
00290
00291 void add(const MEvent& evt)
00292 {
00293 if (target_frames_.empty())
00294 {
00295 return;
00296 }
00297
00298 namespace mt = ros::message_traits;
00299 const MConstPtr& message = evt.getMessage();
00300 const std::string& frame_id = *mt::FrameId<M>::pointer(*message);
00301 ros::Time stamp = mt::TimeStamp<M>::value(*message);
00302
00303 if (frame_id.empty())
00304 {
00305 messageDropped(evt, filter_failure_reasons::EmptyFrameID);
00306 return;
00307 }
00308
00309 boost::mutex::scoped_lock lock(messages_mutex_);
00310
00311 MessageInfo info;
00312 info.handles.reserve(expected_success_count_);
00313 {
00314 V_string::iterator it = target_frames_.begin();
00315 V_string::iterator end = target_frames_.end();
00316 for (; it != end; ++it)
00317 {
00318 const std::string& target_frame = *it;
00319 tf2::TransformableRequestHandle handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp);
00320 if (handle == 0xffffffffffffffffULL)
00321 {
00322 messageDropped(evt, filter_failure_reasons::OutTheBack);
00323 return;
00324 }
00325 else if (handle == 0)
00326 {
00327 ++info.success_count;
00328 }
00329 else
00330 {
00331 info.handles.push_back(handle);
00332 }
00333
00334 if (!time_tolerance_.isZero())
00335 {
00336 handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp + time_tolerance_);
00337 if (handle == 0xffffffffffffffffULL)
00338 {
00339 messageDropped(evt, filter_failure_reasons::OutTheBack);
00340 return;
00341 }
00342 else if (handle == 0)
00343 {
00344 ++info.success_count;
00345 }
00346 else
00347 {
00348 info.handles.push_back(handle);
00349 }
00350 }
00351 }
00352 }
00353
00354
00355 if (info.success_count == expected_success_count_)
00356 {
00357 messageReady(evt);
00358 }
00359 else
00360 {
00361
00362 if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00363 {
00364 ++dropped_message_count_;
00365 const MessageInfo& front = messages_.front();
00366 TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_,
00367 (*mt::FrameId<M>::pointer(*front.event.getMessage())).c_str(), mt::TimeStamp<M>::value(*front.event.getMessage()).toSec());
00368
00369 V_TransformableRequestHandle::const_iterator it = front.handles.begin();
00370 V_TransformableRequestHandle::const_iterator end = front.handles.end();
00371 for (; it != end; ++it)
00372 {
00373 bc_.cancelTransformableRequest(*it);
00374 }
00375
00376 messageDropped(front.event, filter_failure_reasons::Unknown);
00377
00378 messages_.pop_front();
00379 --message_count_;
00380 }
00381
00382
00383 info.event = evt;
00384 messages_.push_back(info);
00385 ++message_count_;
00386 }
00387
00388 TF2_ROS_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
00389
00390 ++incoming_message_count_;
00391 }
00392
00398 void add(const MConstPtr& message)
00399 {
00400 boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
00401 (*header)["callerid"] = "unknown";
00402 ros::WallTime n = ros::WallTime::now();
00403 ros::Time t(n.sec, n.nsec);
00404 add(MEvent(message, header, t));
00405 }
00406
00411 message_filters::Connection registerFailureCallback(const FailureCallback& callback)
00412 {
00413 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00414 return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
00415 }
00416
00417 private:
00418
00419 void init()
00420 {
00421 message_count_ = 0;
00422 successful_transform_count_ = 0;
00423 failed_out_the_back_count_ = 0;
00424 transform_message_count_ = 0;
00425 incoming_message_count_ = 0;
00426 dropped_message_count_ = 0;
00427 time_tolerance_ = ros::Duration(0.0);
00428 warned_about_empty_frame_id_ = false;
00429 expected_success_count_ = 1;
00430
00431 callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
00432 }
00433
00434 void transformable(tf2::TransformableRequestHandle request_handle, const std::string& target_frame, const std::string& source_frame,
00435 ros::Time time, tf2::TransformableResult result)
00436 {
00437 namespace mt = ros::message_traits;
00438
00439 boost::mutex::scoped_lock lock(messages_mutex_);
00440
00441
00442 typename L_MessageInfo::iterator msg_it = messages_.begin();
00443 typename L_MessageInfo::iterator msg_end = messages_.end();
00444 for (; msg_it != msg_end; ++msg_it)
00445 {
00446 MessageInfo& info = *msg_it;
00447 V_TransformableRequestHandle::const_iterator handle_it = std::find(info.handles.begin(), info.handles.end(), request_handle);
00448 if (handle_it != info.handles.end())
00449 {
00450
00451 ++info.success_count;
00452 break;
00453 }
00454 }
00455
00456 if (msg_it == msg_end)
00457 {
00458 return;
00459 }
00460
00461 const MessageInfo& info = *msg_it;
00462 if (info.success_count < expected_success_count_)
00463 {
00464 return;
00465 }
00466
00467 bool can_transform = true;
00468 const MConstPtr& message = info.event.getMessage();
00469 const std::string& frame_id = *mt::FrameId<M>::pointer(*message);
00470 ros::Time stamp = mt::TimeStamp<M>::value(*message);
00471
00472 if (result == tf2::TransformAvailable)
00473 {
00474
00475 typename V_string::iterator it = target_frames_.begin();
00476 typename V_string::iterator end = target_frames_.end();
00477 for (; it != end; ++it)
00478 {
00479 const std::string& target = *it;
00480 if (!bc_.canTransform(target, frame_id, stamp))
00481 {
00482 can_transform = false;
00483 break;
00484 }
00485
00486 if (!time_tolerance_.isZero())
00487 {
00488 if (!bc_.canTransform(target, frame_id, stamp + time_tolerance_))
00489 {
00490 can_transform = false;
00491 break;
00492 }
00493 }
00494 }
00495 }
00496 else
00497 {
00498 can_transform = false;
00499 }
00500
00501 if (can_transform)
00502 {
00503 TF2_ROS_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
00504
00505 ++successful_transform_count_;
00506
00507 messageReady(info.event);
00508
00509 }
00510 else
00511 {
00512 ++dropped_message_count_;
00513
00514 TF2_ROS_MESSAGEFILTER_DEBUG("Discarding message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
00515 messageDropped(info.event, filter_failure_reasons::Unknown);
00516 }
00517
00518 messages_.erase(msg_it);
00519 --message_count_;
00520 }
00521
00525 void incomingMessage(const ros::MessageEvent<M const>& evt)
00526 {
00527 add(evt);
00528 }
00529
00530 void checkFailures()
00531 {
00532 if (next_failure_warning_.isZero())
00533 {
00534 next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(15);
00535 }
00536
00537 if (ros::WallTime::now() >= next_failure_warning_)
00538 {
00539 if (incoming_message_count_ - message_count_ == 0)
00540 {
00541 return;
00542 }
00543
00544 double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00545 if (dropped_pct > 0.95)
00546 {
00547 TF2_ROS_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00548 next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(60);
00549
00550 if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00551 {
00552 TF2_ROS_MESSAGEFILTER_WARN(" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00553 }
00554 }
00555 }
00556 }
00557
00558 struct CBQueueCallback : public ros::CallbackInterface
00559 {
00560 CBQueueCallback(MessageFilter* filter, const MEvent& event, bool success, FilterFailureReason reason)
00561 : event_(event)
00562 , filter_(filter)
00563 , reason_(reason)
00564 , success_(success)
00565 {}
00566
00567
00568 virtual CallResult call()
00569 {
00570 if (success_)
00571 {
00572 filter_->signalMessage(event_);
00573 }
00574 else
00575 {
00576 filter_->signalFailure(event_, reason_);
00577 }
00578
00579 return Success;
00580 }
00581
00582 private:
00583 MEvent event_;
00584 MessageFilter* filter_;
00585 FilterFailureReason reason_;
00586 bool success_;
00587 };
00588
00589 void messageDropped(const MEvent& evt, FilterFailureReason reason)
00590 {
00591 if (callback_queue_)
00592 {
00593 ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, false, reason));
00594 callback_queue_->addCallback(cb, (uint64_t)this);
00595 }
00596 else
00597 {
00598 signalFailure(evt, reason);
00599 }
00600 }
00601
00602 void messageReady(const MEvent& evt)
00603 {
00604 if (callback_queue_)
00605 {
00606 ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, true, filter_failure_reasons::Unknown));
00607 callback_queue_->addCallback(cb, (uint64_t)this);
00608 }
00609 else
00610 {
00611 signalMessage(evt);
00612 }
00613 }
00614
00615 void disconnectFailure(const message_filters::Connection& c)
00616 {
00617 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00618 c.getBoostConnection().disconnect();
00619 }
00620
00621 void signalFailure(const MEvent& evt, FilterFailureReason reason)
00622 {
00623 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00624 failure_signal_(evt.getMessage(), reason);
00625 }
00626
00627 tf2::BufferCore& bc_;
00628 V_string target_frames_;
00629 std::string target_frames_string_;
00630 boost::mutex target_frames_string_mutex_;
00631 uint32_t queue_size_;
00632 tf2::TransformableCallbackHandle callback_handle_;
00633
00634 typedef std::vector<tf2::TransformableRequestHandle> V_TransformableRequestHandle;
00635 struct MessageInfo
00636 {
00637 MessageInfo()
00638 : success_count(0)
00639 {}
00640
00641 MEvent event;
00642 V_TransformableRequestHandle handles;
00643 uint32_t success_count;
00644 };
00645 typedef std::list<MessageInfo> L_MessageInfo;
00646 L_MessageInfo messages_;
00647 uint32_t message_count_;
00648 boost::mutex messages_mutex_;
00649 uint32_t expected_success_count_;
00650
00651 bool warned_about_empty_frame_id_;
00652
00653 uint64_t successful_transform_count_;
00654 uint64_t failed_out_the_back_count_;
00655 uint64_t transform_message_count_;
00656 uint64_t incoming_message_count_;
00657 uint64_t dropped_message_count_;
00658
00659 ros::Time last_out_the_back_stamp_;
00660 std::string last_out_the_back_frame_;
00661
00662 ros::WallTime next_failure_warning_;
00663
00664 ros::Duration time_tolerance_;
00665
00666 message_filters::Connection message_connection_;
00667
00668 FailureSignal failure_signal_;
00669 boost::mutex failure_signal_mutex_;
00670
00671 ros::CallbackQueueInterface* callback_queue_;
00672 };
00673
00674 }
00675
00676 #endif
00677