message_filter.h
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2008, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00032 #ifndef TF_MESSAGE_FILTER_H
00033 #define TF_MESSAGE_FILTER_H
00034 
00035 #include <ros/ros.h>
00036 #include <tf/transform_listener.h>
00037 #include <tf/tfMessage.h>
00038 
00039 #include <string>
00040 #include <list>
00041 #include <vector>
00042 #include <boost/function.hpp>
00043 #include <boost/bind.hpp>
00044 #include <boost/shared_ptr.hpp>
00045 #include <boost/weak_ptr.hpp>
00046 #include <boost/thread.hpp>
00047 #include <boost/signals.hpp>
00048 
00049 #include <ros/callback_queue.h>
00050 
00051 #include <message_filters/connection.h>
00052 #include <message_filters/simple_filter.h>
00053 
00054 #define TF_MESSAGEFILTER_DEBUG(fmt, ...) \
00055   ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00056 
00057 #define TF_MESSAGEFILTER_WARN(fmt, ...) \
00058   ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00059 
00060 namespace tf
00061 {
00062 
00063 namespace filter_failure_reasons
00064 {
00065 enum FilterFailureReason
00066 {
00068   Unknown,
00070   OutTheBack,
00072   EmptyFrameID,
00073 };
00074 }
00075 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
00076 
00077 class MessageFilterBase
00078 {
00079 public:
00080   virtual ~MessageFilterBase(){}
00081   virtual void clear() = 0;
00082   virtual void setTargetFrame(const std::string& target_frame) = 0;
00083   virtual void setTargetFrames(const std::vector<std::string>& target_frames) = 0;
00084   virtual void setTolerance(const ros::Duration& tolerance) = 0;
00085   virtual void setQueueSize( uint32_t new_queue_size ) = 0;
00086   virtual uint32_t getQueueSize() = 0;
00087 };
00088 
00105 template<class M>
00106 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
00107 {
00108 public:
00109   typedef boost::shared_ptr<M const> MConstPtr;
00110   typedef ros::MessageEvent<M const> MEvent;
00111   typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
00112   typedef boost::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
00113 
00114   // If you hit this assert your message does not have a header, or does not have the HasHeader trait defined for it
00115   ROS_STATIC_ASSERT(ros::message_traits::HasHeader<M>::value);
00116 
00126   MessageFilter(Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00127   : tf_(tf)
00128   , nh_(nh)
00129   , max_rate_(max_rate)
00130   , queue_size_(queue_size)
00131   {
00132     init();
00133 
00134     setTargetFrame(target_frame);
00135   }
00136 
00147   template<class F>
00148   MessageFilter(F& f, Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00149   : tf_(tf)
00150   , nh_(nh)
00151   , max_rate_(max_rate)
00152   , queue_size_(queue_size)
00153   {
00154     init();
00155 
00156     setTargetFrame(target_frame);
00157 
00158     connectInput(f);
00159   }
00160 
00164   template<class F>
00165   void connectInput(F& f)
00166   {
00167     message_connection_.disconnect();
00168     message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
00169   }
00170 
00174   ~MessageFilter()
00175   {
00176     // Explicitly stop callbacks; they could execute after we're destroyed
00177     max_rate_timer_.stop();
00178     message_connection_.disconnect();
00179     tf_.removeTransformsChangedListener(tf_connection_);
00180 
00181     clear();
00182 
00183     TF_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00184                            (long long unsigned int)successful_transform_count_, (long long unsigned int)failed_transform_count_, 
00185                            (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_, 
00186                            (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00187 
00188   }
00189 
00193   void setTargetFrame(const std::string& target_frame)
00194   {
00195     std::vector<std::string> frames;
00196     frames.push_back(target_frame);
00197     setTargetFrames(frames);
00198   }
00199 
00203   void setTargetFrames(const std::vector<std::string>& target_frames)
00204   {
00205     boost::mutex::scoped_lock list_lock(messages_mutex_);
00206     boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
00207 
00208     target_frames_ = target_frames;
00209 
00210     std::stringstream ss;
00211     for (std::vector<std::string>::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00212     {
00213       ss << *it << " ";
00214     }
00215     target_frames_string_ = ss.str();
00216   }
00220   std::string getTargetFramesString()
00221   {
00222     boost::mutex::scoped_lock lock(target_frames_string_mutex_);
00223     return target_frames_string_;
00224   };
00225 
00229   void setTolerance(const ros::Duration& tolerance)
00230   {
00231     time_tolerance_ = tolerance;
00232   }
00233 
00237   void clear()
00238   {
00239     boost::mutex::scoped_lock lock(messages_mutex_);
00240 
00241     TF_MESSAGEFILTER_DEBUG("%s", "Cleared");
00242 
00243     messages_.clear();
00244     message_count_ = 0;
00245 
00246     warned_about_unresolved_name_ = false;
00247     warned_about_empty_frame_id_ = false;
00248   }
00249 
00250   void add(const MEvent& evt)
00251   {
00252     boost::mutex::scoped_lock lock(messages_mutex_);
00253 
00254     testMessages();
00255 
00256     if (!testMessage(evt))
00257     {
00258       // If this message is about to push us past our queue size, erase the oldest message
00259       if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00260       {
00261         ++dropped_message_count_;
00262         const MEvent& front = messages_.front();
00263         TF_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, front.getMessage()->header.frame_id.c_str(), front.getMessage()->header.stamp.toSec());
00264         signalFailure(messages_.front(), filter_failure_reasons::Unknown);
00265 
00266         messages_.pop_front();
00267         --message_count_;
00268       }
00269 
00270       // Add the message to our list
00271       messages_.push_back(evt);
00272       ++message_count_;
00273     }
00274 
00275     TF_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", evt.getMessage()->header.frame_id.c_str(), evt.getMessage()->header.stamp.toSec(), message_count_);
00276 
00277     ++incoming_message_count_;
00278   }
00279 
00285   void add(const MConstPtr& message)
00286   {
00287     
00288     boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
00289     (*header)["callerid"] = "unknown";
00290     add(MEvent(message, header, ros::Time::now()));
00291   }
00292 
00297   message_filters::Connection registerFailureCallback(const FailureCallback& callback)
00298   {
00299     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00300     return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
00301   }
00302 
00303   virtual void setQueueSize( uint32_t new_queue_size )
00304   {
00305     queue_size_ = new_queue_size;
00306   }
00307 
00308   virtual uint32_t getQueueSize()
00309   {
00310     return queue_size_;
00311   }
00312 
00313 private:
00314 
00315   void init()
00316   {
00317     message_count_ = 0;
00318     new_transforms_ = false;
00319     successful_transform_count_ = 0;
00320     failed_transform_count_ = 0;
00321     failed_out_the_back_count_ = 0;
00322     transform_message_count_ = 0;
00323     incoming_message_count_ = 0;
00324     dropped_message_count_ = 0;
00325     time_tolerance_ = ros::Duration(0.0);
00326     warned_about_unresolved_name_ = false;
00327     warned_about_empty_frame_id_ = false;
00328 
00329     tf_connection_ = tf_.addTransformsChangedListener(boost::bind(&MessageFilter::transformsChanged, this));
00330 
00331     max_rate_timer_ = nh_.createTimer(max_rate_, &MessageFilter::maxRateTimerCallback, this);
00332   }
00333 
00334   typedef std::list<MEvent> L_Event;
00335 
00336   bool testMessage(const MEvent& evt)
00337   {
00338     const MConstPtr& message = evt.getMessage();
00339     std::string callerid = evt.getPublisherName();//message->__connection_header ? (*message->__connection_header)["callerid"] : "unknown";
00340     std::string frame_id = ros::message_traits::FrameId<M>::value(*message);
00341     ros::Time stamp = ros::message_traits::TimeStamp<M>::value(*message);
00342 
00343     //Throw out messages which have an empty frame_id
00344     if (frame_id.empty())
00345     {
00346       if (!warned_about_empty_frame_id_)
00347       {
00348         warned_about_empty_frame_id_ = true;
00349         TF_MESSAGEFILTER_WARN("Discarding message from [%s] due to empty frame_id.  This message will only print once.", callerid.c_str());
00350       }
00351       signalFailure(evt, filter_failure_reasons::EmptyFrameID);
00352       return true;
00353     }
00354 
00355 
00356     //Throw out messages which are too old
00358     for (std::vector<std::string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it)
00359     {
00360       const std::string& target_frame = *target_it;
00361 
00362       if (target_frame != frame_id && stamp != ros::Time(0))
00363       {
00364         ros::Time latest_transform_time ;
00365 
00366         tf_.getLatestCommonTime(frame_id, target_frame, latest_transform_time, 0) ;
00367         
00368         if (stamp + tf_.getCacheLength() < latest_transform_time)
00369         {
00370           ++failed_out_the_back_count_;
00371           ++dropped_message_count_;
00372           TF_MESSAGEFILTER_DEBUG("Discarding Message, in frame %s, Out of the back of Cache Time(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f.  Message Count now: %d", message->header.frame_id.c_str(), message->header.stamp.toSec(),  tf_.getCacheLength().toSec(), latest_transform_time.toSec(), message_count_);
00373 
00374           last_out_the_back_stamp_ = stamp;
00375           last_out_the_back_frame_ = frame_id;
00376 
00377           signalFailure(evt, filter_failure_reasons::OutTheBack);
00378           return true;
00379         }
00380       }
00381 
00382     }
00383 
00384     bool ready = !target_frames_.empty();
00385     for (std::vector<std::string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it)
00386     {
00387       std::string& target_frame = *target_it;
00388       if (time_tolerance_ != ros::Duration(0.0))
00389       {
00390         ready = ready && (tf_.canTransform(target_frame, frame_id, stamp) &&
00391                           tf_.canTransform(target_frame, frame_id, stamp + time_tolerance_) );
00392       }
00393       else
00394       {
00395         ready = ready && tf_.canTransform(target_frame, frame_id, stamp);
00396       }
00397     }
00398 
00399     if (ready)
00400     {
00401       TF_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
00402 
00403       ++successful_transform_count_;
00404 
00405       this->signalMessage(evt);
00406     }
00407     else
00408     {
00409       ++failed_transform_count_;
00410     }
00411 
00412     return ready;
00413   }
00414 
00415   void testMessages()
00416   {
00417     if (!messages_.empty() && getTargetFramesString() == " ")
00418     {
00419       ROS_WARN_NAMED("message_notifier", "MessageFilter [target=%s]: empty target frame", getTargetFramesString().c_str());
00420     }
00421 
00422     int i = 0;
00423 
00424     typename L_Event::iterator it = messages_.begin();
00425     for (; it != messages_.end(); ++i)
00426     {
00427       MEvent& evt = *it;
00428 
00429       if (testMessage(evt))
00430       {
00431         --message_count_;
00432         it = messages_.erase(it);
00433       }
00434       else
00435       {
00436         ++it;
00437       }
00438     }
00439   }
00440 
00441   void maxRateTimerCallback(const ros::TimerEvent&)
00442   {
00443     boost::mutex::scoped_lock list_lock(messages_mutex_);
00444     if (new_transforms_)
00445     {
00446       testMessages();
00447       new_transforms_ = false;
00448     }
00449 
00450     checkFailures();
00451   }
00452 
00456   void incomingMessage(const ros::MessageEvent<M const>& evt)
00457   {
00458     add(evt);
00459   }
00460 
00461   void transformsChanged()
00462   {
00463     new_transforms_ = true;
00464 
00465     ++transform_message_count_;
00466   }
00467 
00468   void checkFailures()
00469   {
00470     if (next_failure_warning_.isZero())
00471     {
00472       next_failure_warning_ = ros::Time::now() + ros::Duration(15);
00473     }
00474 
00475     if (ros::Time::now() >= next_failure_warning_)
00476     {
00477       if (incoming_message_count_ - message_count_ == 0)
00478       {
00479         return;
00480       }
00481 
00482       double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00483       if (dropped_pct > 0.95)
00484       {
00485         TF_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00486         next_failure_warning_ = ros::Time::now() + ros::Duration(60);
00487 
00488         if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00489         {
00490           TF_MESSAGEFILTER_WARN("  The majority of dropped messages were due to messages growing older than the TF cache time.  The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00491         }
00492       }
00493     }
00494   }
00495 
00496   void disconnectFailure(const message_filters::Connection& c)
00497   {
00498     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00499     c.getBoostConnection().disconnect();
00500   }
00501 
00502   void signalFailure(const MEvent& evt, FilterFailureReason reason)
00503   {
00504     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00505     failure_signal_(evt.getMessage(), reason);
00506   }
00507 
00508   Transformer& tf_; 
00509   ros::NodeHandle nh_; 
00510   ros::Duration max_rate_;
00511   ros::Timer max_rate_timer_;
00512   std::vector<std::string> target_frames_; 
00513   std::string target_frames_string_;
00514   boost::mutex target_frames_string_mutex_;
00515   uint32_t queue_size_; 
00516 
00517   L_Event messages_; 
00518   uint32_t message_count_; 
00519   boost::mutex messages_mutex_; 
00520 
00521   bool new_messages_; 
00522   volatile bool new_transforms_; 
00523 
00524   bool warned_about_unresolved_name_;
00525   bool warned_about_empty_frame_id_;
00526 
00527   uint64_t successful_transform_count_;
00528   uint64_t failed_transform_count_;
00529   uint64_t failed_out_the_back_count_;
00530   uint64_t transform_message_count_;
00531   uint64_t incoming_message_count_;
00532   uint64_t dropped_message_count_;
00533 
00534   ros::Time last_out_the_back_stamp_;
00535   std::string last_out_the_back_frame_;
00536 
00537   ros::Time next_failure_warning_;
00538 
00539   ros::Duration time_tolerance_; 
00540 
00541   boost::signals::connection tf_connection_;
00542   message_filters::Connection message_connection_;
00543 
00544   FailureSignal failure_signal_;
00545   boost::mutex failure_signal_mutex_;
00546 };
00547 
00548 
00549 } // namespace tf
00550 
00551 #endif
00552 


tf
Author(s): Tully Foote, Eitan Marder-Eppstein, Wim Meeussen
autogenerated on Thu Aug 27 2015 13:02:09