00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00032 #ifndef TF2_ROS_MESSAGE_FILTER_H
00033 #define TF2_ROS_MESSAGE_FILTER_H
00034
00035 #include <tf2/buffer_core.h>
00036
00037 #include <string>
00038 #include <list>
00039 #include <vector>
00040 #include <boost/function.hpp>
00041 #include <boost/bind.hpp>
00042 #include <boost/shared_ptr.hpp>
00043 #include <boost/thread.hpp>
00044
00045 #include <message_filters/connection.h>
00046 #include <message_filters/simple_filter.h>
00047
00048 #include <ros/node_handle.h>
00049 #include <ros/callback_queue_interface.h>
00050 #include <ros/init.h>
00051
00052 #define TF2_ROS_MESSAGEFILTER_DEBUG(fmt, ...) \
00053 ROS_DEBUG_NAMED("message_filter", std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), getTargetFramesString().c_str(), __VA_ARGS__)
00054
00055 #define TF2_ROS_MESSAGEFILTER_WARN(fmt, ...) \
00056 ROS_WARN_NAMED("message_filter", std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), getTargetFramesString().c_str(), __VA_ARGS__)
00057
00058 namespace tf2_ros
00059 {
00060
00061 namespace filter_failure_reasons
00062 {
00063 enum FilterFailureReason
00064 {
00066 Unknown,
00068 OutTheBack,
00070 EmptyFrameID,
00071 };
00072 }
00073 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
00074
00075 class MessageFilterBase
00076 {
00077 public:
00078 typedef std::vector<std::string> V_string;
00079
00080 virtual ~MessageFilterBase(){}
00081 virtual void clear() = 0;
00082 virtual void setTargetFrame(const std::string& target_frame) = 0;
00083 virtual void setTargetFrames(const V_string& target_frames) = 0;
00084 virtual void setTolerance(const ros::Duration& tolerance) = 0;
00085 };
00086
00103 template<class M>
00104 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
00105 {
00106 public:
00107 typedef boost::shared_ptr<M const> MConstPtr;
00108 typedef ros::MessageEvent<M const> MEvent;
00109 typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
00110 typedef boost::signals2::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
00111
00112
00113
00114
00115
00116
00117
00126 MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
00127 : bc_(bc)
00128 , queue_size_(queue_size)
00129 , callback_queue_(nh.getCallbackQueue())
00130 {
00131 init();
00132
00133 setTargetFrame(target_frame);
00134 }
00135
00145 template<class F>
00146 MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
00147 : bc_(bc)
00148 , queue_size_(queue_size)
00149 , callback_queue_(nh.getCallbackQueue())
00150 {
00151 init();
00152
00153 setTargetFrame(target_frame);
00154
00155 connectInput(f);
00156 }
00157
00168 MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
00169 : bc_(bc)
00170 , queue_size_(queue_size)
00171 , callback_queue_(cbqueue)
00172 {
00173 init();
00174
00175 setTargetFrame(target_frame);
00176 }
00177
00189 template<class F>
00190 MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
00191 : bc_(bc)
00192 , queue_size_(queue_size)
00193 , callback_queue_(cbqueue)
00194 {
00195 init();
00196
00197 setTargetFrame(target_frame);
00198
00199 connectInput(f);
00200 }
00201
00205 template<class F>
00206 void connectInput(F& f)
00207 {
00208 message_connection_.disconnect();
00209 message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
00210 }
00211
00215 ~MessageFilter()
00216 {
00217 message_connection_.disconnect();
00218
00219 clear();
00220
00221 TF2_ROS_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00222 (long long unsigned int)successful_transform_count_,
00223 (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_,
00224 (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00225
00226 }
00227
00231 void setTargetFrame(const std::string& target_frame)
00232 {
00233 V_string frames;
00234 frames.push_back(target_frame);
00235 setTargetFrames(frames);
00236 }
00237
00241 void setTargetFrames(const V_string& target_frames)
00242 {
00243 boost::mutex::scoped_lock frames_lock(target_frames_mutex_);
00244
00245 target_frames_.resize(target_frames.size());
00246 std::transform(target_frames.begin(), target_frames.end(), target_frames_.begin(), this->stripSlash);
00247 expected_success_count_ = target_frames_.size() * (time_tolerance_.isZero() ? 1 : 2);
00248
00249 std::stringstream ss;
00250 for (V_string::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00251 {
00252 ss << *it << " ";
00253 }
00254 target_frames_string_ = ss.str();
00255 }
00259 std::string getTargetFramesString()
00260 {
00261 boost::mutex::scoped_lock lock(target_frames_mutex_);
00262 return target_frames_string_;
00263 };
00264
00268 void setTolerance(const ros::Duration& tolerance)
00269 {
00270 boost::mutex::scoped_lock lock(target_frames_mutex_);
00271 time_tolerance_ = tolerance;
00272 expected_success_count_ = target_frames_.size() * (time_tolerance_.isZero() ? 1 : 2);
00273 }
00274
00278 void clear()
00279 {
00280 boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_);
00281
00282 TF2_ROS_MESSAGEFILTER_DEBUG("%s", "Cleared");
00283
00284 bc_.removeTransformableCallback(callback_handle_);
00285 callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
00286
00287 messages_.clear();
00288 message_count_ = 0;
00289
00290 warned_about_empty_frame_id_ = false;
00291 }
00292
00293 void add(const MEvent& evt)
00294 {
00295 if (target_frames_.empty())
00296 {
00297 return;
00298 }
00299
00300 namespace mt = ros::message_traits;
00301 const MConstPtr& message = evt.getMessage();
00302 std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
00303 ros::Time stamp = mt::TimeStamp<M>::value(*message);
00304
00305 if (frame_id.empty())
00306 {
00307 messageDropped(evt, filter_failure_reasons::EmptyFrameID);
00308 return;
00309 }
00310
00311
00312 MessageInfo info;
00313 info.handles.reserve(expected_success_count_);
00314 {
00315 V_string target_frames_copy;
00316
00317 {
00318 boost::mutex::scoped_lock frames_lock(target_frames_mutex_);
00319 target_frames_copy = target_frames_;
00320 }
00321
00322 V_string::iterator it = target_frames_copy.begin();
00323 V_string::iterator end = target_frames_copy.end();
00324 for (; it != end; ++it)
00325 {
00326 const std::string& target_frame = *it;
00327 tf2::TransformableRequestHandle handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp);
00328 if (handle == 0xffffffffffffffffULL)
00329 {
00330 messageDropped(evt, filter_failure_reasons::OutTheBack);
00331 return;
00332 }
00333 else if (handle == 0)
00334 {
00335 ++info.success_count;
00336 }
00337 else
00338 {
00339 info.handles.push_back(handle);
00340 }
00341
00342 if (!time_tolerance_.isZero())
00343 {
00344 handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp + time_tolerance_);
00345 if (handle == 0xffffffffffffffffULL)
00346 {
00347 messageDropped(evt, filter_failure_reasons::OutTheBack);
00348 return;
00349 }
00350 else if (handle == 0)
00351 {
00352 ++info.success_count;
00353 }
00354 else
00355 {
00356 info.handles.push_back(handle);
00357 }
00358 }
00359 }
00360 }
00361
00362
00363
00364 if (info.success_count == expected_success_count_)
00365 {
00366 messageReady(evt);
00367 }
00368 else
00369 {
00370 boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_);
00371
00372 if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00373 {
00374 ++dropped_message_count_;
00375 const MessageInfo& front = messages_.front();
00376 TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_,
00377 (mt::FrameId<M>::value(*front.event.getMessage())).c_str(), mt::TimeStamp<M>::value(*front.event.getMessage()).toSec());
00378
00379 V_TransformableRequestHandle::const_iterator it = front.handles.begin();
00380 V_TransformableRequestHandle::const_iterator end = front.handles.end();
00381
00382 for (; it != end; ++it)
00383 {
00384 bc_.cancelTransformableRequest(*it);
00385 }
00386
00387 messageDropped(front.event, filter_failure_reasons::Unknown);
00388 messages_.pop_front();
00389 --message_count_;
00390 }
00391
00392
00393 info.event = evt;
00394 messages_.push_back(info);
00395 ++message_count_;
00396 }
00397
00398 TF2_ROS_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
00399
00400 ++incoming_message_count_;
00401 }
00402
00408 void add(const MConstPtr& message)
00409 {
00410 boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
00411 (*header)["callerid"] = "unknown";
00412 ros::WallTime n = ros::WallTime::now();
00413 ros::Time t(n.sec, n.nsec);
00414 add(MEvent(message, header, t));
00415 }
00416
00421 message_filters::Connection registerFailureCallback(const FailureCallback& callback)
00422 {
00423 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00424 return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
00425 }
00426
00427 virtual void setQueueSize( uint32_t new_queue_size )
00428 {
00429 queue_size_ = new_queue_size;
00430 }
00431
00432 virtual uint32_t getQueueSize()
00433 {
00434 return queue_size_;
00435 }
00436
00437
00438 private:
00439
00440 void init()
00441 {
00442 message_count_ = 0;
00443 successful_transform_count_ = 0;
00444 failed_out_the_back_count_ = 0;
00445 transform_message_count_ = 0;
00446 incoming_message_count_ = 0;
00447 dropped_message_count_ = 0;
00448 time_tolerance_ = ros::Duration(0.0);
00449 warned_about_empty_frame_id_ = false;
00450 expected_success_count_ = 1;
00451
00452 callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
00453 }
00454
00455 void transformable(tf2::TransformableRequestHandle request_handle, const std::string& target_frame, const std::string& source_frame,
00456 ros::Time time, tf2::TransformableResult result)
00457 {
00458 namespace mt = ros::message_traits;
00459
00460 boost::upgrade_lock< boost::shared_mutex > lock(messages_mutex_);
00461
00462
00463 typename L_MessageInfo::iterator msg_it = messages_.begin();
00464 typename L_MessageInfo::iterator msg_end = messages_.end();
00465 for (; msg_it != msg_end; ++msg_it)
00466 {
00467 MessageInfo& info = *msg_it;
00468 V_TransformableRequestHandle::const_iterator handle_it = std::find(info.handles.begin(), info.handles.end(), request_handle);
00469 if (handle_it != info.handles.end())
00470 {
00471
00472 ++info.success_count;
00473 break;
00474 }
00475 }
00476
00477 if (msg_it == msg_end)
00478 {
00479 return;
00480 }
00481
00482 const MessageInfo& info = *msg_it;
00483 if (info.success_count < expected_success_count_)
00484 {
00485 return;
00486 }
00487
00488 bool can_transform = true;
00489 const MConstPtr& message = info.event.getMessage();
00490 std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
00491 ros::Time stamp = mt::TimeStamp<M>::value(*message);
00492
00493 if (result == tf2::TransformAvailable)
00494 {
00495 boost::mutex::scoped_lock frames_lock(target_frames_mutex_);
00496
00497 typename V_string::iterator it = target_frames_.begin();
00498 typename V_string::iterator end = target_frames_.end();
00499 for (; it != end; ++it)
00500 {
00501 const std::string& target = *it;
00502 if (!bc_.canTransform(target, frame_id, stamp))
00503 {
00504 can_transform = false;
00505 break;
00506 }
00507
00508 if (!time_tolerance_.isZero())
00509 {
00510 if (!bc_.canTransform(target, frame_id, stamp + time_tolerance_))
00511 {
00512 can_transform = false;
00513 break;
00514 }
00515 }
00516 }
00517 }
00518 else
00519 {
00520 can_transform = false;
00521 }
00522
00523
00524 boost::upgrade_to_unique_lock< boost::shared_mutex > uniqueLock(lock);
00525 if (can_transform)
00526 {
00527 TF2_ROS_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
00528
00529 ++successful_transform_count_;
00530
00531 messageReady(info.event);
00532
00533 }
00534 else
00535 {
00536 ++dropped_message_count_;
00537
00538 TF2_ROS_MESSAGEFILTER_DEBUG("Discarding message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
00539 messageDropped(info.event, filter_failure_reasons::Unknown);
00540 }
00541
00542 messages_.erase(msg_it);
00543 --message_count_;
00544 }
00545
00549 void incomingMessage(const ros::MessageEvent<M const>& evt)
00550 {
00551 add(evt);
00552 }
00553
00554 void checkFailures()
00555 {
00556 if (next_failure_warning_.isZero())
00557 {
00558 next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(15);
00559 }
00560
00561 if (ros::WallTime::now() >= next_failure_warning_)
00562 {
00563 if (incoming_message_count_ - message_count_ == 0)
00564 {
00565 return;
00566 }
00567
00568 double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00569 if (dropped_pct > 0.95)
00570 {
00571 TF2_ROS_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00572 next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(60);
00573
00574 if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00575 {
00576 TF2_ROS_MESSAGEFILTER_WARN(" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00577 }
00578 }
00579 }
00580 }
00581
00582 struct CBQueueCallback : public ros::CallbackInterface
00583 {
00584 CBQueueCallback(MessageFilter* filter, const MEvent& event, bool success, FilterFailureReason reason)
00585 : event_(event)
00586 , filter_(filter)
00587 , reason_(reason)
00588 , success_(success)
00589 {}
00590
00591
00592 virtual CallResult call()
00593 {
00594 if (success_)
00595 {
00596 filter_->signalMessage(event_);
00597 }
00598 else
00599 {
00600 filter_->signalFailure(event_, reason_);
00601 }
00602
00603 return Success;
00604 }
00605
00606 private:
00607 MEvent event_;
00608 MessageFilter* filter_;
00609 FilterFailureReason reason_;
00610 bool success_;
00611 };
00612
00613 void messageDropped(const MEvent& evt, FilterFailureReason reason)
00614 {
00615 if (callback_queue_)
00616 {
00617 ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, false, reason));
00618 callback_queue_->addCallback(cb, (uint64_t)this);
00619 }
00620 else
00621 {
00622 signalFailure(evt, reason);
00623 }
00624 }
00625
00626 void messageReady(const MEvent& evt)
00627 {
00628 if (callback_queue_)
00629 {
00630 ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, true, filter_failure_reasons::Unknown));
00631 callback_queue_->addCallback(cb, (uint64_t)this);
00632 }
00633 else
00634 {
00635 this->signalMessage(evt);
00636 }
00637 }
00638
00639 void disconnectFailure(const message_filters::Connection& c)
00640 {
00641 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00642 c.getBoostConnection().disconnect();
00643 }
00644
00645 void signalFailure(const MEvent& evt, FilterFailureReason reason)
00646 {
00647 boost::mutex::scoped_lock lock(failure_signal_mutex_);
00648 failure_signal_(evt.getMessage(), reason);
00649 }
00650
00651 static
00652 std::string stripSlash(const std::string& in)
00653 {
00654 if ( !in.empty() && (in[0] == '/'))
00655 {
00656 std::string out = in;
00657 out.erase(0, 1);
00658 return out;
00659 }
00660 return in;
00661 }
00662
00663 tf2::BufferCore& bc_;
00664 V_string target_frames_;
00665 std::string target_frames_string_;
00666 boost::mutex target_frames_mutex_;
00667 uint32_t queue_size_;
00668 tf2::TransformableCallbackHandle callback_handle_;
00669
00670 typedef std::vector<tf2::TransformableRequestHandle> V_TransformableRequestHandle;
00671 struct MessageInfo
00672 {
00673 MessageInfo()
00674 : success_count(0)
00675 {}
00676
00677 MEvent event;
00678 V_TransformableRequestHandle handles;
00679 uint32_t success_count;
00680 };
00681 typedef std::list<MessageInfo> L_MessageInfo;
00682 L_MessageInfo messages_;
00683 uint32_t message_count_;
00684 boost::shared_mutex messages_mutex_;
00685 uint32_t expected_success_count_;
00686
00687 bool warned_about_empty_frame_id_;
00688
00689 uint64_t successful_transform_count_;
00690 uint64_t failed_out_the_back_count_;
00691 uint64_t transform_message_count_;
00692 uint64_t incoming_message_count_;
00693 uint64_t dropped_message_count_;
00694
00695 ros::Time last_out_the_back_stamp_;
00696 std::string last_out_the_back_frame_;
00697
00698 ros::WallTime next_failure_warning_;
00699
00700 ros::Duration time_tolerance_;
00701
00702 message_filters::Connection message_connection_;
00703
00704 FailureSignal failure_signal_;
00705 boost::mutex failure_signal_mutex_;
00706
00707 ros::CallbackQueueInterface* callback_queue_;
00708 };
00709
00710 }
00711
00712 #endif