message_filter.h
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2008, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00032 #ifndef TF_MESSAGE_FILTER_H
00033 #define TF_MESSAGE_FILTER_H
00034 
00035 #include <ros/ros.h>
00036 #include <tf/transform_listener.h>
00037 #include <tf/tfMessage.h>
00038 
00039 #include <string>
00040 #include <list>
00041 #include <vector>
00042 #include <boost/function.hpp>
00043 #include <boost/bind.hpp>
00044 #include <boost/shared_ptr.hpp>
00045 #include <boost/weak_ptr.hpp>
00046 #include <boost/thread.hpp>
00047 #include <boost/signals2.hpp>
00048 
00049 #include <ros/callback_queue.h>
00050 
00051 #include <message_filters/connection.h>
00052 #include <message_filters/simple_filter.h>
00053 
00054 #define TF_MESSAGEFILTER_DEBUG(fmt, ...) \
00055   ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00056 
00057 #define TF_MESSAGEFILTER_WARN(fmt, ...) \
00058   ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00059 
00060 namespace tf
00061 {
00062 
00063 namespace filter_failure_reasons
00064 {
00065 enum FilterFailureReason
00066 {
00068   Unknown,
00070   OutTheBack,
00072   EmptyFrameID,
00073 };
00074 }
00075 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
00076 
00077 class MessageFilterBase
00078 {
00079 public:
00080   virtual ~MessageFilterBase(){}
00081   virtual void clear() = 0;
00082   virtual void setTargetFrame(const std::string& target_frame) = 0;
00083   virtual void setTargetFrames(const std::vector<std::string>& target_frames) = 0;
00084   virtual void setTolerance(const ros::Duration& tolerance) = 0;
00085   virtual void setQueueSize( uint32_t new_queue_size ) = 0;
00086   virtual uint32_t getQueueSize() = 0;
00087 };
00088 
00105 template<class M>
00106 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
00107 {
00108 public:
00109   typedef boost::shared_ptr<M const> MConstPtr;
00110   typedef ros::MessageEvent<M const> MEvent;
00111   typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
00112   typedef boost::signals2::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
00113 
00123   MessageFilter(Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00124   : tf_(tf)
00125   , nh_(nh)
00126   , max_rate_(max_rate)
00127   , queue_size_(queue_size)
00128   {
00129     init();
00130 
00131     setTargetFrame(target_frame);
00132   }
00133 
00144   template<class F>
00145   MessageFilter(F& f, Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00146   : tf_(tf)
00147   , nh_(nh)
00148   , max_rate_(max_rate)
00149   , queue_size_(queue_size)
00150   {
00151     init();
00152 
00153     setTargetFrame(target_frame);
00154 
00155     connectInput(f);
00156   }
00157 
00161   template<class F>
00162   void connectInput(F& f)
00163   {
00164     message_connection_.disconnect();
00165     message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
00166   }
00167 
00171   ~MessageFilter()
00172   {
00173     // Explicitly stop callbacks; they could execute after we're destroyed
00174     max_rate_timer_.stop();
00175     message_connection_.disconnect();
00176     tf_.removeTransformsChangedListener(tf_connection_);
00177 
00178     clear();
00179 
00180     TF_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00181                            (long long unsigned int)successful_transform_count_, (long long unsigned int)failed_transform_count_, 
00182                            (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_, 
00183                            (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00184 
00185   }
00186 
00190   void setTargetFrame(const std::string& target_frame)
00191   {
00192     std::vector<std::string> frames;
00193     frames.push_back(target_frame);
00194     setTargetFrames(frames);
00195   }
00196 
00200   void setTargetFrames(const std::vector<std::string>& target_frames)
00201   {
00202     boost::mutex::scoped_lock list_lock(messages_mutex_);
00203     boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
00204 
00205     target_frames_ = target_frames;
00206 
00207     std::stringstream ss;
00208     for (std::vector<std::string>::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00209     {
00210       ss << *it << " ";
00211     }
00212     target_frames_string_ = ss.str();
00213   }
00217   std::string getTargetFramesString()
00218   {
00219     boost::mutex::scoped_lock lock(target_frames_string_mutex_);
00220     return target_frames_string_;
00221   };
00222 
00226   void setTolerance(const ros::Duration& tolerance)
00227   {
00228     time_tolerance_ = tolerance;
00229   }
00230 
00234   void clear()
00235   {
00236     boost::mutex::scoped_lock lock(messages_mutex_);
00237 
00238     TF_MESSAGEFILTER_DEBUG("%s", "Cleared");
00239 
00240     messages_.clear();
00241     message_count_ = 0;
00242 
00243     warned_about_unresolved_name_ = false;
00244     warned_about_empty_frame_id_ = false;
00245   }
00246 
00247   void add(const MEvent& evt)
00248   {
00249     boost::mutex::scoped_lock lock(messages_mutex_);
00250 
00251     testMessages();
00252 
00253     if (!testMessage(evt))
00254     {
00255       // If this message is about to push us past our queue size, erase the oldest message
00256       if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00257       {
00258         ++dropped_message_count_;
00259         const MEvent& front = messages_.front();
00260         TF_MESSAGEFILTER_DEBUG(
00261               "Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)",
00262               message_count_,
00263               ros::message_traits::FrameId<M>::value(*front.getMessage()).c_str(),
00264               ros::message_traits::TimeStamp<M>::value(*front.getMessage()).toSec());
00265         signalFailure(messages_.front(), filter_failure_reasons::Unknown);
00266 
00267         messages_.pop_front();
00268         --message_count_;
00269       }
00270 
00271       // Add the message to our list
00272       messages_.push_back(evt);
00273       ++message_count_;
00274     }
00275 
00276     TF_MESSAGEFILTER_DEBUG(
00277           "Added message in frame %s at time %.3f, count now %d",
00278           ros::message_traits::FrameId<M>::value(*evt.getMessage()).c_str(),
00279           ros::message_traits::TimeStamp<M>::value(*evt.getMessage()).toSec(),
00280           message_count_);
00281 
00282     ++incoming_message_count_;
00283   }
00284 
00290   void add(const MConstPtr& message)
00291   {
00292     
00293     boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
00294     (*header)["callerid"] = "unknown";
00295     add(MEvent(message, header, ros::Time::now()));
00296   }
00297 
00302   message_filters::Connection registerFailureCallback(const FailureCallback& callback)
00303   {
00304     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00305     return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
00306   }
00307 
00308   virtual void setQueueSize( uint32_t new_queue_size )
00309   {
00310     queue_size_ = new_queue_size;
00311   }
00312 
00313   virtual uint32_t getQueueSize()
00314   {
00315     return queue_size_;
00316   }
00317 
00318 private:
00319 
00320   void init()
00321   {
00322     message_count_ = 0;
00323     new_transforms_ = false;
00324     successful_transform_count_ = 0;
00325     failed_transform_count_ = 0;
00326     failed_out_the_back_count_ = 0;
00327     transform_message_count_ = 0;
00328     incoming_message_count_ = 0;
00329     dropped_message_count_ = 0;
00330     time_tolerance_ = ros::Duration(0.0);
00331     warned_about_unresolved_name_ = false;
00332     warned_about_empty_frame_id_ = false;
00333 
00334     tf_connection_ = tf_.addTransformsChangedListener(boost::bind(&MessageFilter::transformsChanged, this));
00335 
00336     max_rate_timer_ = nh_.createTimer(max_rate_, &MessageFilter::maxRateTimerCallback, this);
00337   }
00338 
00339   typedef std::list<MEvent> L_Event;
00340 
00341   bool testMessage(const MEvent& evt)
00342   {
00343     const MConstPtr& message = evt.getMessage();
00344     std::string callerid = evt.getPublisherName();
00345     std::string frame_id = ros::message_traits::FrameId<M>::value(*message);
00346     ros::Time stamp = ros::message_traits::TimeStamp<M>::value(*message);
00347 
00348     //Throw out messages which have an empty frame_id
00349     if (frame_id.empty())
00350     {
00351       if (!warned_about_empty_frame_id_)
00352       {
00353         warned_about_empty_frame_id_ = true;
00354         TF_MESSAGEFILTER_WARN("Discarding message from [%s] due to empty frame_id.  This message will only print once.", callerid.c_str());
00355       }
00356       signalFailure(evt, filter_failure_reasons::EmptyFrameID);
00357       return true;
00358     }
00359 
00360 
00361     //Throw out messages which are too old
00363     for (std::vector<std::string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it)
00364     {
00365       const std::string& target_frame = *target_it;
00366 
00367       if (target_frame != frame_id && stamp != ros::Time(0))
00368       {
00369         ros::Time latest_transform_time ;
00370 
00371         tf_.getLatestCommonTime(frame_id, target_frame, latest_transform_time, 0) ;
00372         
00373         if (stamp + tf_.getCacheLength() < latest_transform_time)
00374         {
00375           ++failed_out_the_back_count_;
00376           ++dropped_message_count_;
00377           TF_MESSAGEFILTER_DEBUG(
00378                 "Discarding Message, in frame %s, Out of the back of Cache Time "
00379                 "(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f. "
00380                 "Message Count now: %d",
00381                 ros::message_traits::FrameId<M>::value(*message).c_str(),
00382                 ros::message_traits::TimeStamp<M>::value(*message).toSec(),
00383                 tf_.getCacheLength().toSec(), latest_transform_time.toSec(), message_count_);
00384 
00385           last_out_the_back_stamp_ = stamp;
00386           last_out_the_back_frame_ = frame_id;
00387 
00388           signalFailure(evt, filter_failure_reasons::OutTheBack);
00389           return true;
00390         }
00391       }
00392 
00393     }
00394 
00395     bool ready = !target_frames_.empty();
00396     for (std::vector<std::string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it)
00397     {
00398       std::string& target_frame = *target_it;
00399       if (time_tolerance_ != ros::Duration(0.0))
00400       {
00401         ready = ready && (tf_.canTransform(target_frame, frame_id, stamp) &&
00402                           tf_.canTransform(target_frame, frame_id, stamp + time_tolerance_) );
00403       }
00404       else
00405       {
00406         ready = ready && tf_.canTransform(target_frame, frame_id, stamp);
00407       }
00408     }
00409 
00410     if (ready)
00411     {
00412       TF_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
00413 
00414       ++successful_transform_count_;
00415 
00416       this->signalMessage(evt);
00417     }
00418     else
00419     {
00420       ++failed_transform_count_;
00421     }
00422 
00423     return ready;
00424   }
00425 
00426   void testMessages()
00427   {
00428     if (!messages_.empty() && getTargetFramesString() == " ")
00429     {
00430       ROS_WARN_NAMED("message_notifier", "MessageFilter [target=%s]: empty target frame", getTargetFramesString().c_str());
00431     }
00432 
00433     int i = 0;
00434 
00435     typename L_Event::iterator it = messages_.begin();
00436     for (; it != messages_.end(); ++i)
00437     {
00438       MEvent& evt = *it;
00439 
00440       if (testMessage(evt))
00441       {
00442         --message_count_;
00443         it = messages_.erase(it);
00444       }
00445       else
00446       {
00447         ++it;
00448       }
00449     }
00450   }
00451 
00452   void maxRateTimerCallback(const ros::TimerEvent&)
00453   {
00454     boost::mutex::scoped_lock list_lock(messages_mutex_);
00455     if (new_transforms_)
00456     {
00457       testMessages();
00458       new_transforms_ = false;
00459     }
00460 
00461     checkFailures();
00462   }
00463 
00467   void incomingMessage(const ros::MessageEvent<M const>& evt)
00468   {
00469     add(evt);
00470   }
00471 
00472   void transformsChanged()
00473   {
00474     new_transforms_ = true;
00475 
00476     ++transform_message_count_;
00477   }
00478 
00479   void checkFailures()
00480   {
00481     if (next_failure_warning_.isZero())
00482     {
00483       next_failure_warning_ = ros::Time::now() + ros::Duration(15);
00484     }
00485 
00486     if (ros::Time::now() >= next_failure_warning_)
00487     {
00488       if (incoming_message_count_ - message_count_ == 0)
00489       {
00490         return;
00491       }
00492 
00493       double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00494       if (dropped_pct > 0.95)
00495       {
00496         TF_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00497         next_failure_warning_ = ros::Time::now() + ros::Duration(60);
00498 
00499         if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00500         {
00501           TF_MESSAGEFILTER_WARN("  The majority of dropped messages were due to messages growing older than the TF cache time.  The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00502         }
00503       }
00504     }
00505   }
00506 
00507   void disconnectFailure(const message_filters::Connection& c)
00508   {
00509     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00510     c.getBoostConnection().disconnect();
00511   }
00512 
00513   void signalFailure(const MEvent& evt, FilterFailureReason reason)
00514   {
00515     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00516     failure_signal_(evt.getMessage(), reason);
00517   }
00518 
00519   Transformer& tf_; 
00520   ros::NodeHandle nh_; 
00521   ros::Duration max_rate_;
00522   ros::Timer max_rate_timer_;
00523   std::vector<std::string> target_frames_; 
00524   std::string target_frames_string_;
00525   boost::mutex target_frames_string_mutex_;
00526   uint32_t queue_size_; 
00527 
00528   L_Event messages_; 
00529   uint32_t message_count_; 
00530   boost::mutex messages_mutex_; 
00531 
00532   bool new_messages_; 
00533   volatile bool new_transforms_; 
00534 
00535   bool warned_about_unresolved_name_;
00536   bool warned_about_empty_frame_id_;
00537 
00538   uint64_t successful_transform_count_;
00539   uint64_t failed_transform_count_;
00540   uint64_t failed_out_the_back_count_;
00541   uint64_t transform_message_count_;
00542   uint64_t incoming_message_count_;
00543   uint64_t dropped_message_count_;
00544 
00545   ros::Time last_out_the_back_stamp_;
00546   std::string last_out_the_back_frame_;
00547 
00548   ros::Time next_failure_warning_;
00549 
00550   ros::Duration time_tolerance_; 
00551 
00552   boost::signals2::connection tf_connection_;
00553   message_filters::Connection message_connection_;
00554 
00555   FailureSignal failure_signal_;
00556   boost::mutex failure_signal_mutex_;
00557 };
00558 
00559 
00560 } // namespace tf
00561 
00562 #endif
00563 


tf
Author(s): Tully Foote, Eitan Marder-Eppstein, Wim Meeussen
autogenerated on Thu Jun 6 2019 18:45:32