message_filter.h
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2010, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00032 #ifndef TF2_ROS_MESSAGE_FILTER_H
00033 #define TF2_ROS_MESSAGE_FILTER_H
00034 
00035 #include <tf2/buffer_core.h>
00036 
00037 #include <string>
00038 #include <list>
00039 #include <vector>
00040 #include <boost/function.hpp>
00041 #include <boost/bind.hpp>
00042 #include <boost/shared_ptr.hpp>
00043 #include <boost/thread.hpp>
00044 
00045 #include <message_filters/connection.h>
00046 #include <message_filters/simple_filter.h>
00047 
00048 #include <ros/node_handle.h>
00049 #include <ros/callback_queue_interface.h>
00050 #include <ros/init.h>
00051 
00052 #define TF2_ROS_MESSAGEFILTER_DEBUG(fmt, ...) \
00053   ROS_DEBUG_NAMED("message_filter", std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), getTargetFramesString().c_str(), __VA_ARGS__)
00054 
00055 #define TF2_ROS_MESSAGEFILTER_WARN(fmt, ...) \
00056   ROS_WARN_NAMED("message_filter", std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), getTargetFramesString().c_str(), __VA_ARGS__)
00057 
00058 namespace tf2_ros
00059 {
00060 
00061 namespace filter_failure_reasons
00062 {
00063 enum FilterFailureReason
00064 {
00066   Unknown,
00068   OutTheBack,
00070   EmptyFrameID,
00071 };
00072 }
00073 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
00074 
00075 class MessageFilterBase
00076 {
00077 public:
00078   typedef std::vector<std::string> V_string;
00079 
00080   virtual ~MessageFilterBase(){}
00081   virtual void clear() = 0;
00082   virtual void setTargetFrame(const std::string& target_frame) = 0;
00083   virtual void setTargetFrames(const V_string& target_frames) = 0;
00084   virtual void setTolerance(const ros::Duration& tolerance) = 0;
00085 };
00086 
00103 template<class M>
00104 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
00105 {
00106 public:
00107   typedef boost::shared_ptr<M const> MConstPtr;
00108   typedef ros::MessageEvent<M const> MEvent;
00109   typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
00110   typedef boost::signals2::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
00111 
00112   // If you hit this assert your message does not have a header, or does not have the HasHeader trait defined for it
00113   // Actually, we need to check that the message has a header, or that it
00114   // has the FrameId and Stamp traits. However I don't know how to do that
00115   // so simply commenting out for now.
00116   //ROS_STATIC_ASSERT(ros::message_traits::HasHeader<M>::value);
00117 
00126   MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
00127   : bc_(bc)
00128   , queue_size_(queue_size)
00129   , callback_queue_(nh.getCallbackQueue())
00130   {
00131     init();
00132 
00133     setTargetFrame(target_frame);
00134   }
00135 
00145   template<class F>
00146   MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
00147   : bc_(bc)
00148   , queue_size_(queue_size)
00149   , callback_queue_(nh.getCallbackQueue())
00150   {
00151     init();
00152 
00153     setTargetFrame(target_frame);
00154 
00155     connectInput(f);
00156   }
00157 
00168   MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
00169   : bc_(bc)
00170   , queue_size_(queue_size)
00171   , callback_queue_(cbqueue)
00172   {
00173     init();
00174 
00175     setTargetFrame(target_frame);
00176   }
00177 
00189   template<class F>
00190   MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
00191   : bc_(bc)
00192   , queue_size_(queue_size)
00193   , callback_queue_(cbqueue)
00194   {
00195     init();
00196 
00197     setTargetFrame(target_frame);
00198 
00199     connectInput(f);
00200   }
00201 
00205   template<class F>
00206   void connectInput(F& f)
00207   {
00208     message_connection_.disconnect();
00209     message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
00210   }
00211 
00215   ~MessageFilter()
00216   {
00217     message_connection_.disconnect();
00218 
00219     clear();
00220 
00221     TF2_ROS_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00222                            (long long unsigned int)successful_transform_count_,
00223                            (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_, 
00224                            (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00225 
00226   }
00227 
00231   void setTargetFrame(const std::string& target_frame)
00232   {
00233     V_string frames;
00234     frames.push_back(target_frame);
00235     setTargetFrames(frames);
00236   }
00237 
00241   void setTargetFrames(const V_string& target_frames)
00242   {
00243     boost::mutex::scoped_lock frames_lock(target_frames_mutex_);
00244 
00245     target_frames_.resize(target_frames.size());
00246     std::transform(target_frames.begin(), target_frames.end(), target_frames_.begin(), this->stripSlash);
00247     expected_success_count_ = target_frames_.size() * (time_tolerance_.isZero() ? 1 : 2);
00248 
00249     std::stringstream ss;
00250     for (V_string::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00251     {
00252       ss << *it << " ";
00253     }
00254     target_frames_string_ = ss.str();
00255   }
00259   std::string getTargetFramesString()
00260   {
00261     boost::mutex::scoped_lock lock(target_frames_mutex_);
00262     return target_frames_string_;
00263   };
00264 
00268   void setTolerance(const ros::Duration& tolerance)
00269   {
00270     boost::mutex::scoped_lock lock(target_frames_mutex_);
00271     time_tolerance_ = tolerance;
00272     expected_success_count_ = target_frames_.size() * (time_tolerance_.isZero() ? 1 : 2);
00273   }
00274 
00278   void clear()
00279   {
00280     boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_);
00281 
00282     TF2_ROS_MESSAGEFILTER_DEBUG("%s", "Cleared");
00283 
00284     bc_.removeTransformableCallback(callback_handle_);
00285     callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
00286 
00287     messages_.clear();
00288     message_count_ = 0;
00289 
00290     warned_about_empty_frame_id_ = false;
00291   }
00292 
00293   void add(const MEvent& evt)
00294   {
00295     if (target_frames_.empty())
00296     {
00297       return;
00298     }
00299 
00300     namespace mt = ros::message_traits;
00301     const MConstPtr& message = evt.getMessage();
00302     std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
00303     ros::Time stamp = mt::TimeStamp<M>::value(*message);
00304 
00305     if (frame_id.empty())
00306     {
00307       messageDropped(evt, filter_failure_reasons::EmptyFrameID);
00308       return;
00309     }
00310 
00311     // iterate through the target frames and add requests for each of them
00312     MessageInfo info;
00313     info.handles.reserve(expected_success_count_);
00314     {
00315       V_string target_frames_copy;
00316       // Copy target_frames_ to avoid deadlock from #79
00317       {
00318         boost::mutex::scoped_lock frames_lock(target_frames_mutex_);
00319         target_frames_copy = target_frames_;
00320       }
00321 
00322       V_string::iterator it = target_frames_copy.begin();
00323       V_string::iterator end = target_frames_copy.end();
00324       for (; it != end; ++it)
00325       {
00326         const std::string& target_frame = *it;
00327         tf2::TransformableRequestHandle handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp);
00328         if (handle == 0xffffffffffffffffULL) // never transformable
00329         {
00330           messageDropped(evt, filter_failure_reasons::OutTheBack);
00331           return;
00332         }
00333         else if (handle == 0)
00334         {
00335           ++info.success_count;
00336         }
00337         else
00338         {
00339           info.handles.push_back(handle);
00340         }
00341 
00342         if (!time_tolerance_.isZero())
00343         {
00344           handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp + time_tolerance_);
00345           if (handle == 0xffffffffffffffffULL) // never transformable
00346           {
00347             messageDropped(evt, filter_failure_reasons::OutTheBack);
00348             return;
00349           }
00350           else if (handle == 0)
00351           {
00352             ++info.success_count;
00353           }
00354           else
00355           {
00356             info.handles.push_back(handle);
00357           }
00358         }
00359       }
00360     }
00361 
00362 
00363     // We can transform already
00364     if (info.success_count == expected_success_count_)
00365     {
00366       messageReady(evt);
00367     }
00368     else
00369     {
00370       boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_);
00371       // If this message is about to push us past our queue size, erase the oldest message
00372       if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00373       {
00374         ++dropped_message_count_;
00375         const MessageInfo& front = messages_.front();
00376         TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_,
00377                                 (mt::FrameId<M>::value(*front.event.getMessage())).c_str(), mt::TimeStamp<M>::value(*front.event.getMessage()).toSec());
00378 
00379         V_TransformableRequestHandle::const_iterator it = front.handles.begin();
00380         V_TransformableRequestHandle::const_iterator end = front.handles.end();
00381 
00382         for (; it != end; ++it)
00383         {
00384           bc_.cancelTransformableRequest(*it);
00385         }
00386 
00387         messageDropped(front.event, filter_failure_reasons::Unknown);
00388         messages_.pop_front();
00389          --message_count_;
00390       }
00391 
00392       // Add the message to our list
00393       info.event = evt;
00394       messages_.push_back(info);
00395       ++message_count_;
00396     }
00397 
00398     TF2_ROS_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
00399 
00400     ++incoming_message_count_;
00401   }
00402 
00408   void add(const MConstPtr& message)
00409   {
00410     boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
00411     (*header)["callerid"] = "unknown";
00412     ros::WallTime n = ros::WallTime::now();
00413     ros::Time t(n.sec, n.nsec);
00414     add(MEvent(message, header, t));
00415   }
00416 
00421   message_filters::Connection registerFailureCallback(const FailureCallback& callback)
00422   {
00423     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00424     return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
00425   }
00426 
00427   virtual void setQueueSize( uint32_t new_queue_size )
00428   {
00429     queue_size_ = new_queue_size;
00430   }
00431 
00432   virtual uint32_t getQueueSize()
00433   {
00434     return queue_size_;
00435   }
00436 
00437 
00438 private:
00439 
00440   void init()
00441   {
00442     message_count_ = 0;
00443     successful_transform_count_ = 0;
00444     failed_out_the_back_count_ = 0;
00445     transform_message_count_ = 0;
00446     incoming_message_count_ = 0;
00447     dropped_message_count_ = 0;
00448     time_tolerance_ = ros::Duration(0.0);
00449     warned_about_empty_frame_id_ = false;
00450     expected_success_count_ = 1;
00451 
00452     callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
00453   }
00454 
00455   void transformable(tf2::TransformableRequestHandle request_handle, const std::string& target_frame, const std::string& source_frame,
00456                      ros::Time time, tf2::TransformableResult result)
00457   {
00458     namespace mt = ros::message_traits;
00459 
00460     boost::upgrade_lock< boost::shared_mutex > lock(messages_mutex_);
00461 
00462     // find the message this request is associated with
00463     typename L_MessageInfo::iterator msg_it = messages_.begin();
00464     typename L_MessageInfo::iterator msg_end = messages_.end();
00465     for (; msg_it != msg_end; ++msg_it)
00466     {
00467       MessageInfo& info = *msg_it;
00468       V_TransformableRequestHandle::const_iterator handle_it = std::find(info.handles.begin(), info.handles.end(), request_handle);
00469       if (handle_it != info.handles.end())
00470       {
00471         // found msg_it
00472         ++info.success_count;
00473         break;
00474       }
00475     }
00476 
00477     if (msg_it == msg_end)
00478     {
00479       return;
00480     }
00481 
00482     const MessageInfo& info = *msg_it;
00483     if (info.success_count < expected_success_count_)
00484     {
00485       return;
00486     }
00487 
00488     bool can_transform = true;
00489     const MConstPtr& message = info.event.getMessage();
00490     std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
00491     ros::Time stamp = mt::TimeStamp<M>::value(*message);
00492 
00493     if (result == tf2::TransformAvailable)
00494     {
00495       boost::mutex::scoped_lock frames_lock(target_frames_mutex_);
00496       // make sure we can still perform all the necessary transforms
00497       typename V_string::iterator it = target_frames_.begin();
00498       typename V_string::iterator end = target_frames_.end();
00499       for (; it != end; ++it)
00500       {
00501         const std::string& target = *it;
00502         if (!bc_.canTransform(target, frame_id, stamp))
00503         {
00504           can_transform = false;
00505           break;
00506         }
00507 
00508         if (!time_tolerance_.isZero())
00509         {
00510           if (!bc_.canTransform(target, frame_id, stamp + time_tolerance_))
00511           {
00512             can_transform = false;
00513             break;
00514           }
00515         }
00516       }
00517     }
00518     else
00519     {
00520       can_transform = false;
00521     }
00522 
00523     // We will be mutating messages now, require unique lock
00524     boost::upgrade_to_unique_lock< boost::shared_mutex > uniqueLock(lock);
00525     if (can_transform)
00526     {
00527       TF2_ROS_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
00528 
00529       ++successful_transform_count_;
00530 
00531       messageReady(info.event);
00532 
00533     }
00534     else
00535     {
00536       ++dropped_message_count_;
00537 
00538       TF2_ROS_MESSAGEFILTER_DEBUG("Discarding message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
00539       messageDropped(info.event, filter_failure_reasons::Unknown);
00540     }
00541 
00542     messages_.erase(msg_it);
00543     --message_count_;
00544   }
00545 
00549   void incomingMessage(const ros::MessageEvent<M const>& evt)
00550   {
00551     add(evt);
00552   }
00553 
00554   void checkFailures()
00555   {
00556     if (next_failure_warning_.isZero())
00557     {
00558       next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(15);
00559     }
00560 
00561     if (ros::WallTime::now() >= next_failure_warning_)
00562     {
00563       if (incoming_message_count_ - message_count_ == 0)
00564       {
00565         return;
00566       }
00567 
00568       double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00569       if (dropped_pct > 0.95)
00570       {
00571         TF2_ROS_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00572         next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(60);
00573 
00574         if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00575         {
00576           TF2_ROS_MESSAGEFILTER_WARN("  The majority of dropped messages were due to messages growing older than the TF cache time.  The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00577         }
00578       }
00579     }
00580   }
00581 
00582   struct CBQueueCallback : public ros::CallbackInterface
00583   {
00584     CBQueueCallback(MessageFilter* filter, const MEvent& event, bool success, FilterFailureReason reason)
00585     : event_(event)
00586     , filter_(filter)
00587     , reason_(reason)
00588     , success_(success)
00589     {}
00590 
00591 
00592     virtual CallResult call()
00593     {
00594       if (success_)
00595       {
00596         filter_->signalMessage(event_);
00597       }
00598       else
00599       {
00600         filter_->signalFailure(event_, reason_);
00601       }
00602 
00603       return Success;
00604     }
00605 
00606   private:
00607     MEvent event_;
00608     MessageFilter* filter_;
00609     FilterFailureReason reason_;
00610     bool success_;
00611   };
00612 
00613   void messageDropped(const MEvent& evt, FilterFailureReason reason)
00614   {
00615     if (callback_queue_)
00616     {
00617       ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, false, reason));
00618       callback_queue_->addCallback(cb, (uint64_t)this);
00619     }
00620     else
00621     {
00622       signalFailure(evt, reason);
00623     }
00624   }
00625 
00626   void messageReady(const MEvent& evt)
00627   {
00628     if (callback_queue_)
00629     {
00630       ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, true, filter_failure_reasons::Unknown));
00631       callback_queue_->addCallback(cb, (uint64_t)this);
00632     }
00633     else
00634     {
00635       this->signalMessage(evt);
00636     }
00637   }
00638 
00639   void disconnectFailure(const message_filters::Connection& c)
00640   {
00641     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00642     c.getBoostConnection().disconnect();
00643   }
00644 
00645   void signalFailure(const MEvent& evt, FilterFailureReason reason)
00646   {
00647     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00648     failure_signal_(evt.getMessage(), reason);
00649   }
00650 
00651   static
00652   std::string stripSlash(const std::string& in)
00653   {
00654     if ( !in.empty() && (in[0] == '/'))
00655     {
00656       std::string out = in;
00657       out.erase(0, 1);
00658       return out;
00659     }
00660     return in;
00661   }
00662 
00663   tf2::BufferCore& bc_; 
00664   V_string target_frames_; 
00665   std::string target_frames_string_;
00666   boost::mutex target_frames_mutex_; 
00667   uint32_t queue_size_; 
00668   tf2::TransformableCallbackHandle callback_handle_;
00669 
00670   typedef std::vector<tf2::TransformableRequestHandle> V_TransformableRequestHandle;
00671   struct MessageInfo
00672   {
00673     MessageInfo()
00674     : success_count(0)
00675     {}
00676 
00677     MEvent event;
00678     V_TransformableRequestHandle handles;
00679     uint32_t success_count;
00680   };
00681   typedef std::list<MessageInfo> L_MessageInfo;
00682   L_MessageInfo messages_;
00683   uint32_t message_count_; 
00684   boost::shared_mutex messages_mutex_; 
00685   uint32_t expected_success_count_;
00686 
00687   bool warned_about_empty_frame_id_;
00688 
00689   uint64_t successful_transform_count_;
00690   uint64_t failed_out_the_back_count_;
00691   uint64_t transform_message_count_;
00692   uint64_t incoming_message_count_;
00693   uint64_t dropped_message_count_;
00694 
00695   ros::Time last_out_the_back_stamp_;
00696   std::string last_out_the_back_frame_;
00697 
00698   ros::WallTime next_failure_warning_;
00699 
00700   ros::Duration time_tolerance_; 
00701 
00702   message_filters::Connection message_connection_;
00703 
00704   FailureSignal failure_signal_;
00705   boost::mutex failure_signal_mutex_;
00706 
00707   ros::CallbackQueueInterface* callback_queue_;
00708 };
00709 
00710 } // namespace tf2
00711 
00712 #endif


tf2_ros
Author(s): Eitan Marder-Eppstein, Wim Meeussen
autogenerated on Thu Jun 6 2019 20:23:00