message_filter.h
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2010, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00032 #ifndef TF2_ROS_MESSAGE_FILTER_H
00033 #define TF2_ROS_MESSAGE_FILTER_H
00034 
00035 #include <tf2/buffer_core.h>
00036 
00037 #include <string>
00038 #include <list>
00039 #include <vector>
00040 #include <boost/function.hpp>
00041 #include <boost/bind.hpp>
00042 #include <boost/shared_ptr.hpp>
00043 #include <boost/thread.hpp>
00044 #include <boost/signals.hpp>
00045 
00046 #include <message_filters/connection.h>
00047 #include <message_filters/simple_filter.h>
00048 
00049 #include <ros/node_handle.h>
00050 #include <ros/callback_queue_interface.h>
00051 #include <ros/init.h>
00052 
00053 #define TF2_ROS_MESSAGEFILTER_DEBUG(fmt, ...) \
00054   ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00055 
00056 #define TF2_ROS_MESSAGEFILTER_WARN(fmt, ...) \
00057   ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00058 
00059 namespace tf2_ros
00060 {
00061 
00062 namespace filter_failure_reasons
00063 {
00064 enum FilterFailureReason
00065 {
00067   Unknown,
00069   OutTheBack,
00071   EmptyFrameID,
00072 };
00073 }
00074 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
00075 
00076 class MessageFilterBase
00077 {
00078 public:
00079   typedef std::vector<std::string> V_string;
00080 
00081   virtual ~MessageFilterBase(){}
00082   virtual void clear() = 0;
00083   virtual void setTargetFrame(const std::string& target_frame) = 0;
00084   virtual void setTargetFrames(const V_string& target_frames) = 0;
00085   virtual void setTolerance(const ros::Duration& tolerance) = 0;
00086 };
00087 
00104 template<class M>
00105 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
00106 {
00107 public:
00108   typedef boost::shared_ptr<M const> MConstPtr;
00109   typedef ros::MessageEvent<M const> MEvent;
00110   typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
00111   typedef boost::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
00112 
00113   // If you hit this assert your message does not have a header, or does not have the HasHeader trait defined for it
00114   ROS_STATIC_ASSERT(ros::message_traits::HasHeader<M>::value);
00115 
00124   MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
00125   : bc_(bc)
00126   , queue_size_(queue_size)
00127   , callback_queue_(nh.getCallbackQueue())
00128   {
00129     init();
00130 
00131     setTargetFrame(target_frame);
00132   }
00133 
00143   template<class F>
00144   MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
00145   : bc_(bc)
00146   , queue_size_(queue_size)
00147   , callback_queue_(nh.getCallbackQueue())
00148   {
00149     init();
00150 
00151     setTargetFrame(target_frame);
00152 
00153     connectInput(f);
00154   }
00155 
00166   MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
00167   : bc_(bc)
00168   , queue_size_(queue_size)
00169   , callback_queue_(cbqueue)
00170   {
00171     init();
00172 
00173     setTargetFrame(target_frame);
00174   }
00175 
00187   template<class F>
00188   MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
00189   : bc_(bc)
00190   , queue_size_(queue_size)
00191   , callback_queue_(cbqueue)
00192   {
00193     init();
00194 
00195     setTargetFrame(target_frame);
00196 
00197     connectInput(f);
00198   }
00199 
00203   template<class F>
00204   void connectInput(F& f)
00205   {
00206     message_connection_.disconnect();
00207     message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
00208   }
00209 
00213   ~MessageFilter()
00214   {
00215     message_connection_.disconnect();
00216 
00217     clear();
00218 
00219     TF2_ROS_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00220                            (long long unsigned int)successful_transform_count_,
00221                            (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_, 
00222                            (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00223 
00224   }
00225 
00229   void setTargetFrame(const std::string& target_frame)
00230   {
00231     V_string frames;
00232     frames.push_back(target_frame);
00233     setTargetFrames(frames);
00234   }
00235 
00239   void setTargetFrames(const V_string& target_frames)
00240   {
00241     boost::mutex::scoped_lock list_lock(messages_mutex_);
00242     boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
00243 
00244     target_frames_ = target_frames;
00245     expected_success_count_ = target_frames_.size() + (time_tolerance_.isZero() ? 0 : 1);
00246 
00247     std::stringstream ss;
00248     for (V_string::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00249     {
00250       ss << *it << " ";
00251     }
00252     target_frames_string_ = ss.str();
00253   }
00257   std::string getTargetFramesString()
00258   {
00259     boost::mutex::scoped_lock lock(target_frames_string_mutex_);
00260     return target_frames_string_;
00261   };
00262 
00266   void setTolerance(const ros::Duration& tolerance)
00267   {
00268     boost::mutex::scoped_lock list_lock(messages_mutex_);
00269     time_tolerance_ = tolerance;
00270     expected_success_count_ = target_frames_.size() + (time_tolerance_.isZero() ? 0 : 1);
00271   }
00272 
00276   void clear()
00277   {
00278     boost::mutex::scoped_lock lock(messages_mutex_);
00279 
00280     TF2_ROS_MESSAGEFILTER_DEBUG("%s", "Cleared");
00281 
00282     bc_.removeTransformableCallback(callback_handle_);
00283     callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
00284 
00285     messages_.clear();
00286     message_count_ = 0;
00287 
00288     warned_about_empty_frame_id_ = false;
00289   }
00290 
00291   void add(const MEvent& evt)
00292   {
00293     if (target_frames_.empty())
00294     {
00295       return;
00296     }
00297 
00298     namespace mt = ros::message_traits;
00299     const MConstPtr& message = evt.getMessage();
00300     const std::string& frame_id = *mt::FrameId<M>::pointer(*message);
00301     ros::Time stamp = mt::TimeStamp<M>::value(*message);
00302 
00303     if (frame_id.empty())
00304     {
00305       messageDropped(evt, filter_failure_reasons::EmptyFrameID);
00306       return;
00307     }
00308 
00309     boost::mutex::scoped_lock lock(messages_mutex_);
00310     // iterate through the target frames and add requests for each of them
00311     MessageInfo info;
00312     info.handles.reserve(expected_success_count_);
00313     {
00314       V_string::iterator it = target_frames_.begin();
00315       V_string::iterator end = target_frames_.end();
00316       for (; it != end; ++it)
00317       {
00318         const std::string& target_frame = *it;
00319         tf2::TransformableRequestHandle handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp);
00320         if (handle == 0xffffffffffffffffULL) // never transformable
00321         {
00322           messageDropped(evt, filter_failure_reasons::OutTheBack);
00323           return;
00324         }
00325         else if (handle == 0)
00326         {
00327           ++info.success_count;
00328         }
00329         else
00330         {
00331           info.handles.push_back(handle);
00332         }
00333 
00334         if (!time_tolerance_.isZero())
00335         {
00336           handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp + time_tolerance_);
00337           if (handle == 0xffffffffffffffffULL) // never transformable
00338           {
00339             messageDropped(evt, filter_failure_reasons::OutTheBack);
00340             return;
00341           }
00342           else if (handle == 0)
00343           {
00344             ++info.success_count;
00345           }
00346           else
00347           {
00348             info.handles.push_back(handle);
00349           }
00350         }
00351       }
00352     }
00353 
00354     // We can transform already
00355     if (info.success_count == expected_success_count_)
00356     {
00357       messageReady(evt);
00358     }
00359     else
00360     {
00361       // If this message is about to push us past our queue size, erase the oldest message
00362       if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00363       {
00364         ++dropped_message_count_;
00365         const MessageInfo& front = messages_.front();
00366         TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_,
00367                                 (*mt::FrameId<M>::pointer(*front.event.getMessage())).c_str(), mt::TimeStamp<M>::value(*front.event.getMessage()).toSec());
00368 
00369         V_TransformableRequestHandle::const_iterator it = front.handles.begin();
00370         V_TransformableRequestHandle::const_iterator end = front.handles.end();
00371         for (; it != end; ++it)
00372         {
00373           bc_.cancelTransformableRequest(*it);
00374         }
00375 
00376         messageDropped(front.event, filter_failure_reasons::Unknown);
00377 
00378         messages_.pop_front();
00379         --message_count_;
00380       }
00381 
00382       // Add the message to our list
00383       info.event = evt;
00384       messages_.push_back(info);
00385       ++message_count_;
00386     }
00387 
00388     TF2_ROS_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
00389 
00390     ++incoming_message_count_;
00391   }
00392 
00398   void add(const MConstPtr& message)
00399   {
00400     boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
00401     (*header)["callerid"] = "unknown";
00402     ros::WallTime n = ros::WallTime::now();
00403     ros::Time t(n.sec, n.nsec);
00404     add(MEvent(message, header, t));
00405   }
00406 
00411   message_filters::Connection registerFailureCallback(const FailureCallback& callback)
00412   {
00413     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00414     return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
00415   }
00416 
00417   virtual void setQueueSize( uint32_t new_queue_size )
00418   {
00419     queue_size_ = new_queue_size;
00420   }
00421 
00422   virtual uint32_t getQueueSize()
00423   {
00424     return queue_size_;
00425   }
00426 
00427 
00428 private:
00429 
00430   void init()
00431   {
00432     message_count_ = 0;
00433     successful_transform_count_ = 0;
00434     failed_out_the_back_count_ = 0;
00435     transform_message_count_ = 0;
00436     incoming_message_count_ = 0;
00437     dropped_message_count_ = 0;
00438     time_tolerance_ = ros::Duration(0.0);
00439     warned_about_empty_frame_id_ = false;
00440     expected_success_count_ = 1;
00441 
00442     callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
00443   }
00444 
00445   void transformable(tf2::TransformableRequestHandle request_handle, const std::string& target_frame, const std::string& source_frame,
00446                      ros::Time time, tf2::TransformableResult result)
00447   {
00448     namespace mt = ros::message_traits;
00449 
00450     boost::mutex::scoped_lock lock(messages_mutex_);
00451 
00452     // find the message this request is associated with
00453     typename L_MessageInfo::iterator msg_it = messages_.begin();
00454     typename L_MessageInfo::iterator msg_end = messages_.end();
00455     for (; msg_it != msg_end; ++msg_it)
00456     {
00457       MessageInfo& info = *msg_it;
00458       V_TransformableRequestHandle::const_iterator handle_it = std::find(info.handles.begin(), info.handles.end(), request_handle);
00459       if (handle_it != info.handles.end())
00460       {
00461         // found msg_it
00462         ++info.success_count;
00463         break;
00464       }
00465     }
00466 
00467     if (msg_it == msg_end)
00468     {
00469       return;
00470     }
00471 
00472     const MessageInfo& info = *msg_it;
00473     if (info.success_count < expected_success_count_)
00474     {
00475       return;
00476     }
00477 
00478     bool can_transform = true;
00479     const MConstPtr& message = info.event.getMessage();
00480     const std::string& frame_id = *mt::FrameId<M>::pointer(*message);
00481     ros::Time stamp = mt::TimeStamp<M>::value(*message);
00482 
00483     if (result == tf2::TransformAvailable)
00484     {
00485       // make sure we can still perform all the necessary transforms
00486       typename V_string::iterator it = target_frames_.begin();
00487       typename V_string::iterator end = target_frames_.end();
00488       for (; it != end; ++it)
00489       {
00490         const std::string& target = *it;
00491         if (!bc_.canTransform(target, frame_id, stamp))
00492         {
00493           can_transform = false;
00494           break;
00495         }
00496 
00497         if (!time_tolerance_.isZero())
00498         {
00499           if (!bc_.canTransform(target, frame_id, stamp + time_tolerance_))
00500           {
00501             can_transform = false;
00502             break;
00503           }
00504         }
00505       }
00506     }
00507     else
00508     {
00509       can_transform = false;
00510     }
00511 
00512     if (can_transform)
00513     {
00514       TF2_ROS_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
00515 
00516       ++successful_transform_count_;
00517 
00518       messageReady(info.event);
00519 
00520     }
00521     else
00522     {
00523       ++dropped_message_count_;
00524 
00525       TF2_ROS_MESSAGEFILTER_DEBUG("Discarding message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
00526       messageDropped(info.event, filter_failure_reasons::Unknown);
00527     }
00528 
00529     messages_.erase(msg_it);
00530     --message_count_;
00531   }
00532 
00536   void incomingMessage(const ros::MessageEvent<M const>& evt)
00537   {
00538     add(evt);
00539   }
00540 
00541   void checkFailures()
00542   {
00543     if (next_failure_warning_.isZero())
00544     {
00545       next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(15);
00546     }
00547 
00548     if (ros::WallTime::now() >= next_failure_warning_)
00549     {
00550       if (incoming_message_count_ - message_count_ == 0)
00551       {
00552         return;
00553       }
00554 
00555       double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00556       if (dropped_pct > 0.95)
00557       {
00558         TF2_ROS_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00559         next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(60);
00560 
00561         if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00562         {
00563           TF2_ROS_MESSAGEFILTER_WARN("  The majority of dropped messages were due to messages growing older than the TF cache time.  The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00564         }
00565       }
00566     }
00567   }
00568 
00569   struct CBQueueCallback : public ros::CallbackInterface
00570   {
00571     CBQueueCallback(MessageFilter* filter, const MEvent& event, bool success, FilterFailureReason reason)
00572     : event_(event)
00573     , filter_(filter)
00574     , reason_(reason)
00575     , success_(success)
00576     {}
00577 
00578 
00579     virtual CallResult call()
00580     {
00581       if (success_)
00582       {
00583         filter_->signalMessage(event_);
00584       }
00585       else
00586       {
00587         filter_->signalFailure(event_, reason_);
00588       }
00589 
00590       return Success;
00591     }
00592 
00593   private:
00594     MEvent event_;
00595     MessageFilter* filter_;
00596     FilterFailureReason reason_;
00597     bool success_;
00598   };
00599 
00600   void messageDropped(const MEvent& evt, FilterFailureReason reason)
00601   {
00602     if (callback_queue_)
00603     {
00604       ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, false, reason));
00605       callback_queue_->addCallback(cb, (uint64_t)this);
00606     }
00607     else
00608     {
00609       signalFailure(evt, reason);
00610     }
00611   }
00612 
00613   void messageReady(const MEvent& evt)
00614   {
00615     if (callback_queue_)
00616     {
00617       ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, true, filter_failure_reasons::Unknown));
00618       callback_queue_->addCallback(cb, (uint64_t)this);
00619     }
00620     else
00621     {
00622       signalMessage(evt);
00623     }
00624   }
00625 
00626   void disconnectFailure(const message_filters::Connection& c)
00627   {
00628     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00629     c.getBoostConnection().disconnect();
00630   }
00631 
00632   void signalFailure(const MEvent& evt, FilterFailureReason reason)
00633   {
00634     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00635     failure_signal_(evt.getMessage(), reason);
00636   }
00637 
00638   tf2::BufferCore& bc_; 
00639   V_string target_frames_; 
00640   std::string target_frames_string_;
00641   boost::mutex target_frames_string_mutex_;
00642   uint32_t queue_size_; 
00643   tf2::TransformableCallbackHandle callback_handle_;
00644 
00645   typedef std::vector<tf2::TransformableRequestHandle> V_TransformableRequestHandle;
00646   struct MessageInfo
00647   {
00648     MessageInfo()
00649     : success_count(0)
00650     {}
00651 
00652     MEvent event;
00653     V_TransformableRequestHandle handles;
00654     uint32_t success_count;
00655   };
00656   typedef std::list<MessageInfo> L_MessageInfo;
00657   L_MessageInfo messages_;
00658   uint32_t message_count_; 
00659   boost::mutex messages_mutex_; 
00660   uint32_t expected_success_count_;
00661 
00662   bool warned_about_empty_frame_id_;
00663 
00664   uint64_t successful_transform_count_;
00665   uint64_t failed_out_the_back_count_;
00666   uint64_t transform_message_count_;
00667   uint64_t incoming_message_count_;
00668   uint64_t dropped_message_count_;
00669 
00670   ros::Time last_out_the_back_stamp_;
00671   std::string last_out_the_back_frame_;
00672 
00673   ros::WallTime next_failure_warning_;
00674 
00675   ros::Duration time_tolerance_; 
00676 
00677   message_filters::Connection message_connection_;
00678 
00679   FailureSignal failure_signal_;
00680   boost::mutex failure_signal_mutex_;
00681 
00682   ros::CallbackQueueInterface* callback_queue_;
00683 };
00684 
00685 } // namespace tf2
00686 
00687 #endif
00688 


tf2_ros
Author(s): Eitan Marder-Eppstein, Wim Meeussen
autogenerated on Thu Aug 27 2015 13:10:05