message_filter.h
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2008, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00032 #ifndef TF_MESSAGE_FILTER_H
00033 #define TF_MESSAGE_FILTER_H
00034 
00035 #include <ros/ros.h>
00036 #include <tf/transform_listener.h>
00037 #include <tf/tfMessage.h>
00038 
00039 #include <string>
00040 #include <list>
00041 #include <vector>
00042 #include <boost/function.hpp>
00043 #include <boost/bind.hpp>
00044 #include <boost/shared_ptr.hpp>
00045 #include <boost/weak_ptr.hpp>
00046 #include <boost/thread.hpp>
00047 #include <boost/signals.hpp>
00048 
00049 #include <ros/callback_queue.h>
00050 
00051 #include <message_filters/connection.h>
00052 #include <message_filters/simple_filter.h>
00053 
00054 #define TF_MESSAGEFILTER_DEBUG(fmt, ...) \
00055   ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00056 
00057 #define TF_MESSAGEFILTER_WARN(fmt, ...) \
00058   ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00059 
00060 namespace tf
00061 {
00062 
00063 namespace filter_failure_reasons
00064 {
00065 enum FilterFailureReason
00066 {
00068   Unknown,
00070   OutTheBack,
00072   EmptyFrameID,
00073 };
00074 }
00075 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
00076 
00077 class MessageFilterBase
00078 {
00079 public:
00080   virtual ~MessageFilterBase(){}
00081   virtual void clear() = 0;
00082   virtual void setTargetFrame(const std::string& target_frame) = 0;
00083   virtual void setTargetFrames(const std::vector<std::string>& target_frames) = 0;
00084   virtual void setTolerance(const ros::Duration& tolerance) = 0;
00085   virtual void setQueueSize( uint32_t new_queue_size ) = 0;
00086   virtual uint32_t getQueueSize() = 0;
00087 };
00088 
00105 template<class M>
00106 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
00107 {
00108 public:
00109   typedef boost::shared_ptr<M const> MConstPtr;
00110   typedef ros::MessageEvent<M const> MEvent;
00111   typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
00112   typedef boost::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
00113 
00114   // If you hit this assert your message does not have a header, or does not have the HasHeader trait defined for it
00115   ROS_STATIC_ASSERT(ros::message_traits::HasHeader<M>::value);
00116 
00126   MessageFilter(Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00127   : tf_(tf)
00128   , nh_(nh)
00129   , max_rate_(max_rate)
00130   , queue_size_(queue_size)
00131   {
00132     init();
00133 
00134     setTargetFrame(target_frame);
00135   }
00136 
00147   template<class F>
00148   MessageFilter(F& f, Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
00149   : tf_(tf)
00150   , nh_(nh)
00151   , max_rate_(max_rate)
00152   , queue_size_(queue_size)
00153   {
00154     init();
00155 
00156     setTargetFrame(target_frame);
00157 
00158     connectInput(f);
00159   }
00160 
00164   template<class F>
00165   void connectInput(F& f)
00166   {
00167     message_connection_.disconnect();
00168     message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
00169   }
00170 
00174   ~MessageFilter()
00175   {
00176     message_connection_.disconnect();
00177     tf_.removeTransformsChangedListener(tf_connection_);
00178 
00179     clear();
00180 
00181     TF_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00182                            (long long unsigned int)successful_transform_count_, (long long unsigned int)failed_transform_count_, 
00183                            (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_, 
00184                            (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00185 
00186   }
00187 
00191   void setTargetFrame(const std::string& target_frame)
00192   {
00193     std::vector<std::string> frames;
00194     frames.push_back(target_frame);
00195     setTargetFrames(frames);
00196   }
00197 
00201   void setTargetFrames(const std::vector<std::string>& target_frames)
00202   {
00203     boost::mutex::scoped_lock list_lock(messages_mutex_);
00204     boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
00205 
00206     target_frames_ = target_frames;
00207 
00208     std::stringstream ss;
00209     for (std::vector<std::string>::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00210     {
00211       *it = tf::resolve(tf_.getTFPrefix(), *it);
00212       ss << *it << " ";
00213     }
00214     target_frames_string_ = ss.str();
00215   }
00219   std::string getTargetFramesString()
00220   {
00221     boost::mutex::scoped_lock lock(target_frames_string_mutex_);
00222     return target_frames_string_;
00223   };
00224 
00228   void setTolerance(const ros::Duration& tolerance)
00229   {
00230     time_tolerance_ = tolerance;
00231   }
00232 
00236   void clear()
00237   {
00238     boost::mutex::scoped_lock lock(messages_mutex_);
00239 
00240     TF_MESSAGEFILTER_DEBUG("%s", "Cleared");
00241 
00242     messages_.clear();
00243     message_count_ = 0;
00244 
00245     warned_about_unresolved_name_ = false;
00246     warned_about_empty_frame_id_ = false;
00247   }
00248 
00249   void add(const MEvent& evt)
00250   {
00251     boost::mutex::scoped_lock lock(messages_mutex_);
00252 
00253     testMessages();
00254 
00255     if (!testMessage(evt))
00256     {
00257       // If this message is about to push us past our queue size, erase the oldest message
00258       if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00259       {
00260         ++dropped_message_count_;
00261         const MEvent& front = messages_.front();
00262         TF_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, front.getMessage()->header.frame_id.c_str(), front.getMessage()->header.stamp.toSec());
00263         signalFailure(messages_.front(), filter_failure_reasons::Unknown);
00264 
00265         messages_.pop_front();
00266         --message_count_;
00267       }
00268 
00269       // Add the message to our list
00270       messages_.push_back(evt);
00271       ++message_count_;
00272     }
00273 
00274     TF_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", evt.getMessage()->header.frame_id.c_str(), evt.getMessage()->header.stamp.toSec(), message_count_);
00275 
00276     ++incoming_message_count_;
00277   }
00278 
00284   void add(const MConstPtr& message)
00285   {
00286     boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
00287     (*header)["callerid"] = "unknown";
00288     add(MEvent(message, header, ros::Time::now()));
00289   }
00290 
00295   message_filters::Connection registerFailureCallback(const FailureCallback& callback)
00296   {
00297     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00298     return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
00299   }
00300 
00301   virtual void setQueueSize( uint32_t new_queue_size )
00302   {
00303     queue_size_ = new_queue_size;
00304   }
00305 
00306   virtual uint32_t getQueueSize()
00307   {
00308     return queue_size_;
00309   }
00310 
00311 private:
00312 
00313   void init()
00314   {
00315     message_count_ = 0;
00316     new_transforms_ = false;
00317     successful_transform_count_ = 0;
00318     failed_transform_count_ = 0;
00319     failed_out_the_back_count_ = 0;
00320     transform_message_count_ = 0;
00321     incoming_message_count_ = 0;
00322     dropped_message_count_ = 0;
00323     time_tolerance_ = ros::Duration(0.0);
00324     warned_about_unresolved_name_ = false;
00325     warned_about_empty_frame_id_ = false;
00326 
00327     tf_connection_ = tf_.addTransformsChangedListener(boost::bind(&MessageFilter::transformsChanged, this));
00328 
00329     max_rate_timer_ = nh_.createTimer(max_rate_, &MessageFilter::maxRateTimerCallback, this);
00330   }
00331 
00332   typedef std::list<MEvent> L_Event;
00333 
00334   bool testMessage(const MEvent& evt)
00335   {
00336     const MConstPtr& message = evt.getMessage();
00337     std::string callerid = evt.getPublisherName();//message->__connection_header ? (*message->__connection_header)["callerid"] : "unknown";
00338     std::string frame_id = ros::message_traits::FrameId<M>::value(*message);
00339     ros::Time stamp = ros::message_traits::TimeStamp<M>::value(*message);
00340 
00341     //Throw out messages which have an empty frame_id
00342     if (frame_id.empty())
00343     {
00344       if (!warned_about_empty_frame_id_)
00345       {
00346         warned_about_empty_frame_id_ = true;
00347         TF_MESSAGEFILTER_WARN("Discarding message from [%s] due to empty frame_id.  This message will only print once.", callerid.c_str());
00348       }
00349       signalFailure(evt, filter_failure_reasons::EmptyFrameID);
00350       return true;
00351     }
00352 
00353     if (frame_id[0] != '/')
00354     {
00355       std::string unresolved = frame_id;
00356       frame_id = tf::resolve(tf_.getTFPrefix(), frame_id);
00357 
00358       if (!warned_about_unresolved_name_)
00359       {
00360         warned_about_unresolved_name_ = true;
00361         ROS_WARN("Message from [%s] has a non-fully-qualified frame_id [%s]. Resolved locally to [%s].  This is will likely not work in multi-robot systems.  This message will only print once.", callerid.c_str(), unresolved.c_str(), frame_id.c_str());
00362       }
00363     }
00364 
00365     //Throw out messages which are too old
00367     for (std::vector<std::string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it)
00368     {
00369       const std::string& target_frame = *target_it;
00370 
00371       if (target_frame != frame_id && stamp != ros::Time(0))
00372       {
00373         ros::Time latest_transform_time ;
00374 
00375         tf_.getLatestCommonTime(frame_id, target_frame, latest_transform_time, 0) ;
00376         if (stamp + tf_.getCacheLength() < latest_transform_time)
00377         {
00378           ++failed_out_the_back_count_;
00379           ++dropped_message_count_;
00380           TF_MESSAGEFILTER_DEBUG("Discarding Message, in frame %s, Out of the back of Cache Time(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f.  Message Count now: %d", message->header.frame_id.c_str(), message->header.stamp.toSec(),  tf_.getCacheLength().toSec(), latest_transform_time.toSec(), message_count_);
00381 
00382           last_out_the_back_stamp_ = stamp;
00383           last_out_the_back_frame_ = frame_id;
00384 
00385           signalFailure(evt, filter_failure_reasons::OutTheBack);
00386           return true;
00387         }
00388       }
00389 
00390     }
00391 
00392     bool ready = !target_frames_.empty();
00393     for (std::vector<std::string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it)
00394     {
00395       std::string& target_frame = *target_it;
00396       if (time_tolerance_ != ros::Duration(0.0))
00397       {
00398         ready = ready && (tf_.canTransform(target_frame, frame_id, stamp) &&
00399                           tf_.canTransform(target_frame, frame_id, stamp + time_tolerance_) );
00400       }
00401       else
00402       {
00403         ready = ready && tf_.canTransform(target_frame, frame_id, stamp);
00404       }
00405     }
00406 
00407     if (ready)
00408     {
00409       TF_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
00410 
00411       ++successful_transform_count_;
00412 
00413       this->signalMessage(evt);
00414     }
00415     else
00416     {
00417       ++failed_transform_count_;
00418     }
00419 
00420     return ready;
00421   }
00422 
00423   void testMessages()
00424   {
00425     if (!messages_.empty() && getTargetFramesString() == " ")
00426     {
00427       ROS_WARN_NAMED("message_notifier", "MessageFilter [target=%s]: empty target frame", getTargetFramesString().c_str());
00428     }
00429 
00430     int i = 0;
00431 
00432     typename L_Event::iterator it = messages_.begin();
00433     for (; it != messages_.end(); ++i)
00434     {
00435       MEvent& evt = *it;
00436 
00437       if (testMessage(evt))
00438       {
00439         --message_count_;
00440         it = messages_.erase(it);
00441       }
00442       else
00443       {
00444         ++it;
00445       }
00446     }
00447   }
00448 
00449   void maxRateTimerCallback(const ros::TimerEvent&)
00450   {
00451     boost::mutex::scoped_lock list_lock(messages_mutex_);
00452     if (new_transforms_)
00453     {
00454       testMessages();
00455       new_transforms_ = false;
00456     }
00457 
00458     checkFailures();
00459   }
00460 
00464   void incomingMessage(const ros::MessageEvent<M const>& evt)
00465   {
00466     add(evt);
00467   }
00468 
00469   void transformsChanged()
00470   {
00471     new_transforms_ = true;
00472 
00473     ++transform_message_count_;
00474   }
00475 
00476   void checkFailures()
00477   {
00478     if (next_failure_warning_.isZero())
00479     {
00480       next_failure_warning_ = ros::Time::now() + ros::Duration(15);
00481     }
00482 
00483     if (ros::Time::now() >= next_failure_warning_)
00484     {
00485       if (incoming_message_count_ - message_count_ == 0)
00486       {
00487         return;
00488       }
00489 
00490       double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00491       if (dropped_pct > 0.95)
00492       {
00493         TF_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00494         next_failure_warning_ = ros::Time::now() + ros::Duration(60);
00495 
00496         if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00497         {
00498           TF_MESSAGEFILTER_WARN("  The majority of dropped messages were due to messages growing older than the TF cache time.  The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00499         }
00500       }
00501     }
00502   }
00503 
00504   void disconnectFailure(const message_filters::Connection& c)
00505   {
00506     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00507     c.getBoostConnection().disconnect();
00508   }
00509 
00510   void signalFailure(const MEvent& evt, FilterFailureReason reason)
00511   {
00512     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00513     failure_signal_(evt.getMessage(), reason);
00514   }
00515 
00516   Transformer& tf_; 
00517   ros::NodeHandle nh_; 
00518   ros::Duration max_rate_;
00519   ros::Timer max_rate_timer_;
00520   std::vector<std::string> target_frames_; 
00521   std::string target_frames_string_;
00522   boost::mutex target_frames_string_mutex_;
00523   uint32_t queue_size_; 
00524 
00525   L_Event messages_; 
00526   uint32_t message_count_; 
00527   boost::mutex messages_mutex_; 
00528 
00529   bool new_messages_; 
00530   volatile bool new_transforms_; 
00531 
00532   bool warned_about_unresolved_name_;
00533   bool warned_about_empty_frame_id_;
00534 
00535   uint64_t successful_transform_count_;
00536   uint64_t failed_transform_count_;
00537   uint64_t failed_out_the_back_count_;
00538   uint64_t transform_message_count_;
00539   uint64_t incoming_message_count_;
00540   uint64_t dropped_message_count_;
00541 
00542   ros::Time last_out_the_back_stamp_;
00543   std::string last_out_the_back_frame_;
00544 
00545   ros::Time next_failure_warning_;
00546 
00547   ros::Duration time_tolerance_; 
00548 
00549   boost::signals::connection tf_connection_;
00550   message_filters::Connection message_connection_;
00551 
00552   FailureSignal failure_signal_;
00553   boost::mutex failure_signal_mutex_;
00554 };
00555 
00556 } // namespace tf
00557 
00558 #endif
00559 


tf
Author(s): Tully Foote, Eitan Marder-Eppstein, Wim Meeussen
autogenerated on Mon Oct 6 2014 00:12:04