message_filter.h
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2010, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00032 #ifndef TF2_ROS_MESSAGE_FILTER_H
00033 #define TF2_ROS_MESSAGE_FILTER_H
00034 
00035 #include <tf2/buffer_core.h>
00036 
00037 #include <string>
00038 #include <list>
00039 #include <vector>
00040 #include <boost/function.hpp>
00041 #include <boost/bind.hpp>
00042 #include <boost/shared_ptr.hpp>
00043 #include <boost/thread.hpp>
00044 #include <boost/signals.hpp>
00045 
00046 #include <message_filters/connection.h>
00047 #include <message_filters/simple_filter.h>
00048 
00049 #include <ros/node_handle.h>
00050 #include <ros/callback_queue_interface.h>
00051 #include <ros/init.h>
00052 
00053 #define TF2_ROS_MESSAGEFILTER_DEBUG(fmt, ...) \
00054   ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00055 
00056 #define TF2_ROS_MESSAGEFILTER_WARN(fmt, ...) \
00057   ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__)
00058 
00059 namespace tf2_ros
00060 {
00061 
00062 namespace filter_failure_reasons
00063 {
00064 enum FilterFailureReason
00065 {
00067   Unknown,
00069   OutTheBack,
00071   EmptyFrameID,
00072 };
00073 }
00074 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;
00075 
00076 class MessageFilterBase
00077 {
00078 public:
00079   typedef std::vector<std::string> V_string;
00080 
00081   virtual ~MessageFilterBase(){}
00082   virtual void clear() = 0;
00083   virtual void setTargetFrame(const std::string& target_frame) = 0;
00084   virtual void setTargetFrames(const V_string& target_frames) = 0;
00085   virtual void setTolerance(const ros::Duration& tolerance) = 0;
00086 };
00087 
00104 template<class M>
00105 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
00106 {
00107 public:
00108   typedef boost::shared_ptr<M const> MConstPtr;
00109   typedef ros::MessageEvent<M const> MEvent;
00110   typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
00111   typedef boost::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
00112 
00113   // If you hit this assert your message does not have a header, or does not have the HasHeader trait defined for it
00114   ROS_STATIC_ASSERT(ros::message_traits::HasHeader<M>::value);
00115 
00124   MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
00125   : bc_(bc)
00126   , queue_size_(queue_size)
00127   , callback_queue_(nh.getCallbackQueue())
00128   {
00129     init();
00130 
00131     setTargetFrame(target_frame);
00132   }
00133 
00143   template<class F>
00144   MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
00145   : bc_(bc)
00146   , queue_size_(queue_size)
00147   , callback_queue_(nh.getCallbackQueue())
00148   {
00149     init();
00150 
00151     setTargetFrame(target_frame);
00152 
00153     connectInput(f);
00154   }
00155 
00166   MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
00167   : bc_(bc)
00168   , queue_size_(queue_size)
00169   , callback_queue_(cbqueue)
00170   {
00171     init();
00172 
00173     setTargetFrame(target_frame);
00174   }
00175 
00187   template<class F>
00188   MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
00189   : bc_(bc)
00190   , queue_size_(queue_size)
00191   , callback_queue_(cbqueue)
00192   {
00193     init();
00194 
00195     setTargetFrame(target_frame);
00196 
00197     connectInput(f);
00198   }
00199 
00203   template<class F>
00204   void connectInput(F& f)
00205   {
00206     message_connection_.disconnect();
00207     message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
00208   }
00209 
00213   ~MessageFilter()
00214   {
00215     message_connection_.disconnect();
00216 
00217     clear();
00218 
00219     TF2_ROS_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00220                            (long long unsigned int)successful_transform_count_,
00221                            (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_, 
00222                            (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00223 
00224   }
00225 
00229   void setTargetFrame(const std::string& target_frame)
00230   {
00231     V_string frames;
00232     frames.push_back(target_frame);
00233     setTargetFrames(frames);
00234   }
00235 
00239   void setTargetFrames(const V_string& target_frames)
00240   {
00241     boost::mutex::scoped_lock list_lock(messages_mutex_);
00242     boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
00243 
00244     target_frames_ = target_frames;
00245     expected_success_count_ = target_frames_.size() + (time_tolerance_.isZero() ? 0 : 1);
00246 
00247     std::stringstream ss;
00248     for (V_string::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00249     {
00250       ss << *it << " ";
00251     }
00252     target_frames_string_ = ss.str();
00253   }
00257   std::string getTargetFramesString()
00258   {
00259     boost::mutex::scoped_lock lock(target_frames_string_mutex_);
00260     return target_frames_string_;
00261   };
00262 
00266   void setTolerance(const ros::Duration& tolerance)
00267   {
00268     boost::mutex::scoped_lock list_lock(messages_mutex_);
00269     time_tolerance_ = tolerance;
00270     expected_success_count_ = target_frames_.size() + (time_tolerance_.isZero() ? 0 : 1);
00271   }
00272 
00276   void clear()
00277   {
00278     boost::mutex::scoped_lock lock(messages_mutex_);
00279 
00280     TF2_ROS_MESSAGEFILTER_DEBUG("%s", "Cleared");
00281 
00282     bc_.removeTransformableCallback(callback_handle_);
00283     callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
00284 
00285     messages_.clear();
00286     message_count_ = 0;
00287 
00288     warned_about_empty_frame_id_ = false;
00289   }
00290 
00291   void add(const MEvent& evt)
00292   {
00293     if (target_frames_.empty())
00294     {
00295       return;
00296     }
00297 
00298     namespace mt = ros::message_traits;
00299     const MConstPtr& message = evt.getMessage();
00300     const std::string& frame_id = *mt::FrameId<M>::pointer(*message);
00301     ros::Time stamp = mt::TimeStamp<M>::value(*message);
00302 
00303     if (frame_id.empty())
00304     {
00305       messageDropped(evt, filter_failure_reasons::EmptyFrameID);
00306       return;
00307     }
00308 
00309     boost::mutex::scoped_lock lock(messages_mutex_);
00310     // iterate through the target frames and add requests for each of them
00311     MessageInfo info;
00312     info.handles.reserve(expected_success_count_);
00313     {
00314       V_string::iterator it = target_frames_.begin();
00315       V_string::iterator end = target_frames_.end();
00316       for (; it != end; ++it)
00317       {
00318         const std::string& target_frame = *it;
00319         tf2::TransformableRequestHandle handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp);
00320         if (handle == 0xffffffffffffffffULL) // never transformable
00321         {
00322           messageDropped(evt, filter_failure_reasons::OutTheBack);
00323           return;
00324         }
00325         else if (handle == 0)
00326         {
00327           ++info.success_count;
00328         }
00329         else
00330         {
00331           info.handles.push_back(handle);
00332         }
00333 
00334         if (!time_tolerance_.isZero())
00335         {
00336           handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp + time_tolerance_);
00337           if (handle == 0xffffffffffffffffULL) // never transformable
00338           {
00339             messageDropped(evt, filter_failure_reasons::OutTheBack);
00340             return;
00341           }
00342           else if (handle == 0)
00343           {
00344             ++info.success_count;
00345           }
00346           else
00347           {
00348             info.handles.push_back(handle);
00349           }
00350         }
00351       }
00352     }
00353 
00354     // We can transform already
00355     if (info.success_count == expected_success_count_)
00356     {
00357       messageReady(evt);
00358     }
00359     else
00360     {
00361       // If this message is about to push us past our queue size, erase the oldest message
00362       if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00363       {
00364         ++dropped_message_count_;
00365         const MessageInfo& front = messages_.front();
00366         TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_,
00367                                 (*mt::FrameId<M>::pointer(*front.event.getMessage())).c_str(), mt::TimeStamp<M>::value(*front.event.getMessage()).toSec());
00368 
00369         V_TransformableRequestHandle::const_iterator it = front.handles.begin();
00370         V_TransformableRequestHandle::const_iterator end = front.handles.end();
00371         for (; it != end; ++it)
00372         {
00373           bc_.cancelTransformableRequest(*it);
00374         }
00375 
00376         messageDropped(front.event, filter_failure_reasons::Unknown);
00377 
00378         messages_.pop_front();
00379         --message_count_;
00380       }
00381 
00382       // Add the message to our list
00383       info.event = evt;
00384       messages_.push_back(info);
00385       ++message_count_;
00386     }
00387 
00388     TF2_ROS_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
00389 
00390     ++incoming_message_count_;
00391   }
00392 
00398   void add(const MConstPtr& message)
00399   {
00400     boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
00401     (*header)["callerid"] = "unknown";
00402     ros::WallTime n = ros::WallTime::now();
00403     ros::Time t(n.sec, n.nsec);
00404     add(MEvent(message, header, t));
00405   }
00406 
00411   message_filters::Connection registerFailureCallback(const FailureCallback& callback)
00412   {
00413     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00414     return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
00415   }
00416 
00417 private:
00418 
00419   void init()
00420   {
00421     message_count_ = 0;
00422     successful_transform_count_ = 0;
00423     failed_out_the_back_count_ = 0;
00424     transform_message_count_ = 0;
00425     incoming_message_count_ = 0;
00426     dropped_message_count_ = 0;
00427     time_tolerance_ = ros::Duration(0.0);
00428     warned_about_empty_frame_id_ = false;
00429     expected_success_count_ = 1;
00430 
00431     callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
00432   }
00433 
00434   void transformable(tf2::TransformableRequestHandle request_handle, const std::string& target_frame, const std::string& source_frame,
00435                      ros::Time time, tf2::TransformableResult result)
00436   {
00437     namespace mt = ros::message_traits;
00438 
00439     boost::mutex::scoped_lock lock(messages_mutex_);
00440 
00441     // find the message this request is associated with
00442     typename L_MessageInfo::iterator msg_it = messages_.begin();
00443     typename L_MessageInfo::iterator msg_end = messages_.end();
00444     for (; msg_it != msg_end; ++msg_it)
00445     {
00446       MessageInfo& info = *msg_it;
00447       V_TransformableRequestHandle::const_iterator handle_it = std::find(info.handles.begin(), info.handles.end(), request_handle);
00448       if (handle_it != info.handles.end())
00449       {
00450         // found msg_it
00451         ++info.success_count;
00452         break;
00453       }
00454     }
00455 
00456     if (msg_it == msg_end)
00457     {
00458       return;
00459     }
00460 
00461     const MessageInfo& info = *msg_it;
00462     if (info.success_count < expected_success_count_)
00463     {
00464       return;
00465     }
00466 
00467     bool can_transform = true;
00468     const MConstPtr& message = info.event.getMessage();
00469     const std::string& frame_id = *mt::FrameId<M>::pointer(*message);
00470     ros::Time stamp = mt::TimeStamp<M>::value(*message);
00471 
00472     if (result == tf2::TransformAvailable)
00473     {
00474       // make sure we can still perform all the necessary transforms
00475       typename V_string::iterator it = target_frames_.begin();
00476       typename V_string::iterator end = target_frames_.end();
00477       for (; it != end; ++it)
00478       {
00479         const std::string& target = *it;
00480         if (!bc_.canTransform(target, frame_id, stamp))
00481         {
00482           can_transform = false;
00483           break;
00484         }
00485 
00486         if (!time_tolerance_.isZero())
00487         {
00488           if (!bc_.canTransform(target, frame_id, stamp + time_tolerance_))
00489           {
00490             can_transform = false;
00491             break;
00492           }
00493         }
00494       }
00495     }
00496     else
00497     {
00498       can_transform = false;
00499     }
00500 
00501     if (can_transform)
00502     {
00503       TF2_ROS_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
00504 
00505       ++successful_transform_count_;
00506 
00507       messageReady(info.event);
00508 
00509     }
00510     else
00511     {
00512       ++dropped_message_count_;
00513 
00514       TF2_ROS_MESSAGEFILTER_DEBUG("Discarding message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
00515       messageDropped(info.event, filter_failure_reasons::Unknown);
00516     }
00517 
00518     messages_.erase(msg_it);
00519     --message_count_;
00520   }
00521 
00525   void incomingMessage(const ros::MessageEvent<M const>& evt)
00526   {
00527     add(evt);
00528   }
00529 
00530   void checkFailures()
00531   {
00532     if (next_failure_warning_.isZero())
00533     {
00534       next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(15);
00535     }
00536 
00537     if (ros::WallTime::now() >= next_failure_warning_)
00538     {
00539       if (incoming_message_count_ - message_count_ == 0)
00540       {
00541         return;
00542       }
00543 
00544       double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00545       if (dropped_pct > 0.95)
00546       {
00547         TF2_ROS_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00548         next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(60);
00549 
00550         if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00551         {
00552           TF2_ROS_MESSAGEFILTER_WARN("  The majority of dropped messages were due to messages growing older than the TF cache time.  The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00553         }
00554       }
00555     }
00556   }
00557 
00558   struct CBQueueCallback : public ros::CallbackInterface
00559   {
00560     CBQueueCallback(MessageFilter* filter, const MEvent& event, bool success, FilterFailureReason reason)
00561     : event_(event)
00562     , filter_(filter)
00563     , reason_(reason)
00564     , success_(success)
00565     {}
00566 
00567 
00568     virtual CallResult call()
00569     {
00570       if (success_)
00571       {
00572         filter_->signalMessage(event_);
00573       }
00574       else
00575       {
00576         filter_->signalFailure(event_, reason_);
00577       }
00578 
00579       return Success;
00580     }
00581 
00582   private:
00583     MEvent event_;
00584     MessageFilter* filter_;
00585     FilterFailureReason reason_;
00586     bool success_;
00587   };
00588 
00589   void messageDropped(const MEvent& evt, FilterFailureReason reason)
00590   {
00591     if (callback_queue_)
00592     {
00593       ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, false, reason));
00594       callback_queue_->addCallback(cb, (uint64_t)this);
00595     }
00596     else
00597     {
00598       signalFailure(evt, reason);
00599     }
00600   }
00601 
00602   void messageReady(const MEvent& evt)
00603   {
00604     if (callback_queue_)
00605     {
00606       ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, true, filter_failure_reasons::Unknown));
00607       callback_queue_->addCallback(cb, (uint64_t)this);
00608     }
00609     else
00610     {
00611       signalMessage(evt);
00612     }
00613   }
00614 
00615   void disconnectFailure(const message_filters::Connection& c)
00616   {
00617     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00618     c.getBoostConnection().disconnect();
00619   }
00620 
00621   void signalFailure(const MEvent& evt, FilterFailureReason reason)
00622   {
00623     boost::mutex::scoped_lock lock(failure_signal_mutex_);
00624     failure_signal_(evt.getMessage(), reason);
00625   }
00626 
00627   tf2::BufferCore& bc_; 
00628   V_string target_frames_; 
00629   std::string target_frames_string_;
00630   boost::mutex target_frames_string_mutex_;
00631   uint32_t queue_size_; 
00632   tf2::TransformableCallbackHandle callback_handle_;
00633 
00634   typedef std::vector<tf2::TransformableRequestHandle> V_TransformableRequestHandle;
00635   struct MessageInfo
00636   {
00637     MessageInfo()
00638     : success_count(0)
00639     {}
00640 
00641     MEvent event;
00642     V_TransformableRequestHandle handles;
00643     uint32_t success_count;
00644   };
00645   typedef std::list<MessageInfo> L_MessageInfo;
00646   L_MessageInfo messages_;
00647   uint32_t message_count_; 
00648   boost::mutex messages_mutex_; 
00649   uint32_t expected_success_count_;
00650 
00651   bool warned_about_empty_frame_id_;
00652 
00653   uint64_t successful_transform_count_;
00654   uint64_t failed_out_the_back_count_;
00655   uint64_t transform_message_count_;
00656   uint64_t incoming_message_count_;
00657   uint64_t dropped_message_count_;
00658 
00659   ros::Time last_out_the_back_stamp_;
00660   std::string last_out_the_back_frame_;
00661 
00662   ros::WallTime next_failure_warning_;
00663 
00664   ros::Duration time_tolerance_; 
00665 
00666   message_filters::Connection message_connection_;
00667 
00668   FailureSignal failure_signal_;
00669   boost::mutex failure_signal_mutex_;
00670 
00671   ros::CallbackQueueInterface* callback_queue_;
00672 };
00673 
00674 } // namespace tf2
00675 
00676 #endif
00677 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines


tf2_ros
Author(s): Wim Meeussen, Eitan Marder-Eppstein
autogenerated on Mon Aug 19 2013 10:27:02