message_filter.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2008, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of the Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
32 #ifndef TF_MESSAGE_FILTER_H
33 #define TF_MESSAGE_FILTER_H
34 
35 #include <ros/ros.h>
36 #include <tf/transform_listener.h>
37 #include <tf/tfMessage.h>
38 
39 #include <string>
40 #include <list>
41 #include <vector>
42 #include <boost/function.hpp>
43 #include <boost/bind.hpp>
44 #include <boost/shared_ptr.hpp>
45 #include <boost/weak_ptr.hpp>
46 #include <boost/thread.hpp>
47 #include <boost/signals2.hpp>
48 
49 #include <ros/callback_queue.h>
50 
53 
54 #define TF_MESSAGEFILTER_DEBUG(fmt, ...) \
55  ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
56 
57 #define TF_MESSAGEFILTER_WARN(fmt, ...) \
58  ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__)
59 
60 namespace tf
61 {
62 
63 namespace filter_failure_reasons
64 {
66 {
73 };
74 }
76 
78 {
79 public:
80  virtual ~MessageFilterBase(){}
81  virtual void clear() = 0;
82  virtual void setTargetFrame(const std::string& target_frame) = 0;
83  virtual void setTargetFrames(const std::vector<std::string>& target_frames) = 0;
84  virtual void setTolerance(const ros::Duration& tolerance) = 0;
85  virtual void setQueueSize( uint32_t new_queue_size ) = 0;
86  virtual uint32_t getQueueSize() = 0;
87 };
88 
105 template<class M>
107 {
108 public:
111  typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
112  typedef boost::signals2::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
113 
123  MessageFilter(Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
124  : tf_(tf)
125  , nh_(nh)
126  , max_rate_(max_rate)
127  , queue_size_(queue_size)
128  {
129  init();
130 
131  setTargetFrame(target_frame);
132  }
133 
144  template<class F>
145  MessageFilter(F& f, Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
146  : tf_(tf)
147  , nh_(nh)
148  , max_rate_(max_rate)
149  , queue_size_(queue_size)
150  {
151  init();
152 
153  setTargetFrame(target_frame);
154 
155  connectInput(f);
156  }
157 
161  template<class F>
162  void connectInput(F& f)
163  {
164  message_connection_.disconnect();
165  message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
166  }
167 
172  {
173  // Explicitly stop callbacks; they could execute after we're destroyed
174  max_rate_timer_.stop();
175  message_connection_.disconnect();
176  tf_.removeTransformsChangedListener(tf_connection_);
177 
178  clear();
179 
180  TF_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
181  (long long unsigned int)successful_transform_count_, (long long unsigned int)failed_transform_count_,
182  (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_,
183  (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
184 
185  }
186 
190  void setTargetFrame(const std::string& target_frame)
191  {
192  std::vector<std::string> frames;
193  frames.push_back(target_frame);
194  setTargetFrames(frames);
195  }
196 
200  void setTargetFrames(const std::vector<std::string>& target_frames)
201  {
202  boost::mutex::scoped_lock list_lock(messages_mutex_);
203  boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
204 
205  target_frames_ = target_frames;
206 
207  std::stringstream ss;
208  for (std::vector<std::string>::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
209  {
210  ss << *it << " ";
211  }
212  target_frames_string_ = ss.str();
213  }
217  std::string getTargetFramesString()
218  {
219  boost::mutex::scoped_lock lock(target_frames_string_mutex_);
220  return target_frames_string_;
221  };
222 
226  void setTolerance(const ros::Duration& tolerance)
227  {
228  time_tolerance_ = tolerance;
229  }
230 
234  void clear()
235  {
236  boost::mutex::scoped_lock lock(messages_mutex_);
237 
238  TF_MESSAGEFILTER_DEBUG("%s", "Cleared");
239 
240  messages_.clear();
241  message_count_ = 0;
242 
243  warned_about_unresolved_name_ = false;
244  warned_about_empty_frame_id_ = false;
245  }
246 
247  void add(const MEvent& evt)
248  {
249  boost::mutex::scoped_lock lock(messages_mutex_);
250 
251  testMessages();
252 
253  if (!testMessage(evt))
254  {
255  // If this message is about to push us past our queue size, erase the oldest message
256  if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
257  {
258  ++dropped_message_count_;
259  const MEvent& front = messages_.front();
261  "Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)",
262  message_count_,
265  signalFailure(messages_.front(), filter_failure_reasons::Unknown);
266 
267  messages_.pop_front();
268  --message_count_;
269  }
270 
271  // Add the message to our list
272  messages_.push_back(evt);
273  ++message_count_;
274  }
275 
277  "Added message in frame %s at time %.3f, count now %d",
280  message_count_);
281 
282  ++incoming_message_count_;
283  }
284 
290  void add(const MConstPtr& message)
291  {
292 
293  boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
294  (*header)["callerid"] = "unknown";
295  add(MEvent(message, header, ros::Time::now()));
296  }
297 
302  message_filters::Connection registerFailureCallback(const FailureCallback& callback)
303  {
304  boost::mutex::scoped_lock lock(failure_signal_mutex_);
305  return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
306  }
307 
308  virtual void setQueueSize( uint32_t new_queue_size )
309  {
310  queue_size_ = new_queue_size;
311  }
312 
313  virtual uint32_t getQueueSize()
314  {
315  return queue_size_;
316  }
317 
318 private:
319 
320  void init()
321  {
322  message_count_ = 0;
323  new_transforms_ = false;
324  successful_transform_count_ = 0;
325  failed_transform_count_ = 0;
326  failed_out_the_back_count_ = 0;
327  transform_message_count_ = 0;
328  incoming_message_count_ = 0;
329  dropped_message_count_ = 0;
330  time_tolerance_ = ros::Duration(0.0);
331  warned_about_unresolved_name_ = false;
332  warned_about_empty_frame_id_ = false;
333 
334  tf_connection_ = tf_.addTransformsChangedListener(boost::bind(&MessageFilter::transformsChanged, this));
335 
336  max_rate_timer_ = nh_.createTimer(max_rate_, &MessageFilter::maxRateTimerCallback, this);
337  }
338 
339  typedef std::list<MEvent> L_Event;
340 
341  bool testMessage(const MEvent& evt)
342  {
343  const MConstPtr& message = evt.getMessage();
344  std::string callerid = evt.getPublisherName();
345  std::string frame_id = ros::message_traits::FrameId<M>::value(*message);
347 
348  //Throw out messages which have an empty frame_id
349  if (frame_id.empty())
350  {
351  if (!warned_about_empty_frame_id_)
352  {
353  warned_about_empty_frame_id_ = true;
354  TF_MESSAGEFILTER_WARN("Discarding message from [%s] due to empty frame_id. This message will only print once.", callerid.c_str());
355  }
356  signalFailure(evt, filter_failure_reasons::EmptyFrameID);
357  return true;
358  }
359 
360 
361  //Throw out messages which are too old
363  for (std::vector<std::string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it)
364  {
365  const std::string& target_frame = *target_it;
366 
367  if (target_frame != frame_id && stamp != ros::Time(0))
368  {
369  ros::Time latest_transform_time ;
370 
371  tf_.getLatestCommonTime(frame_id, target_frame, latest_transform_time, 0) ;
372 
373  if (stamp + tf_.getCacheLength() < latest_transform_time)
374  {
375  ++failed_out_the_back_count_;
376  ++dropped_message_count_;
378  "Discarding Message, in frame %s, Out of the back of Cache Time "
379  "(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f. "
380  "Message Count now: %d",
381  ros::message_traits::FrameId<M>::value(*message).c_str(),
383  tf_.getCacheLength().toSec(), latest_transform_time.toSec(), message_count_);
384 
385  last_out_the_back_stamp_ = stamp;
386  last_out_the_back_frame_ = frame_id;
387 
388  signalFailure(evt, filter_failure_reasons::OutTheBack);
389  return true;
390  }
391  }
392 
393  }
394 
395  bool ready = !target_frames_.empty();
396  for (std::vector<std::string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it)
397  {
398  std::string& target_frame = *target_it;
399  if (time_tolerance_ != ros::Duration(0.0))
400  {
401  ready = ready && (tf_.canTransform(target_frame, frame_id, stamp) &&
402  tf_.canTransform(target_frame, frame_id, stamp + time_tolerance_) );
403  }
404  else
405  {
406  ready = ready && tf_.canTransform(target_frame, frame_id, stamp);
407  }
408  }
409 
410  if (ready)
411  {
412  TF_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
413 
414  ++successful_transform_count_;
415 
416  this->signalMessage(evt);
417  }
418  else
419  {
420  ++failed_transform_count_;
421  }
422 
423  return ready;
424  }
425 
427  {
428  if (!messages_.empty() && getTargetFramesString() == " ")
429  {
430  ROS_WARN_NAMED("message_notifier", "MessageFilter [target=%s]: empty target frame", getTargetFramesString().c_str());
431  }
432 
433  int i = 0;
434 
435  typename L_Event::iterator it = messages_.begin();
436  for (; it != messages_.end(); ++i)
437  {
438  MEvent& evt = *it;
439 
440  if (testMessage(evt))
441  {
442  --message_count_;
443  it = messages_.erase(it);
444  }
445  else
446  {
447  ++it;
448  }
449  }
450  }
451 
453  {
454  boost::mutex::scoped_lock list_lock(messages_mutex_);
455  if (new_transforms_)
456  {
457  testMessages();
458  new_transforms_ = false;
459  }
460 
461  checkFailures();
462  }
463 
468  {
469  add(evt);
470  }
471 
473  {
474  new_transforms_ = true;
475 
476  ++transform_message_count_;
477  }
478 
480  {
481  if (next_failure_warning_.isZero())
482  {
483  next_failure_warning_ = ros::Time::now() + ros::Duration(15);
484  }
485 
486  if (ros::Time::now() >= next_failure_warning_)
487  {
488  if (incoming_message_count_ - message_count_ == 0)
489  {
490  return;
491  }
492 
493  double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
494  if (dropped_pct > 0.95)
495  {
496  TF_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
497  next_failure_warning_ = ros::Time::now() + ros::Duration(60);
498 
499  if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
500  {
501  TF_MESSAGEFILTER_WARN(" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
502  }
503  }
504  }
505  }
506 
508  {
509  boost::mutex::scoped_lock lock(failure_signal_mutex_);
510  c.getBoostConnection().disconnect();
511  }
512 
513  void signalFailure(const MEvent& evt, FilterFailureReason reason)
514  {
515  boost::mutex::scoped_lock lock(failure_signal_mutex_);
516  failure_signal_(evt.getMessage(), reason);
517  }
518 
523  std::vector<std::string> target_frames_;
526  uint32_t queue_size_;
527 
528  L_Event messages_;
529  uint32_t message_count_;
530  boost::mutex messages_mutex_;
531 
533  volatile bool new_transforms_;
534 
537 
544 
547 
549 
551 
552  boost::signals2::connection tf_connection_;
554 
555  FailureSignal failure_signal_;
556  boost::mutex failure_signal_mutex_;
557 };
558 
559 
560 } // namespace tf
561 
562 #endif
563 
FailureSignal failure_signal_
virtual ~MessageFilterBase()
ros::Time next_failure_warning_
ros::NodeHandle nh_
The node used to subscribe to the topic.
boost::mutex target_frames_string_mutex_
Follows the patterns set by the message_filters package to implement a filter which only passes messa...
L_Event messages_
The message list.
void connectInput(F &f)
Connect this filter&#39;s input to another filter&#39;s output. If this filter is already connected...
void signalFailure(const MEvent &evt, FilterFailureReason reason)
ros::Duration time_tolerance_
Provide additional tolerance on time for messages which are stamped but can have associated duration...
uint64_t dropped_message_count_
#define ROS_WARN_NAMED(name,...)
ros::Timer max_rate_timer_
MessageFilter(Transformer &tf, const std::string &target_frame, uint32_t queue_size, ros::NodeHandle nh=ros::NodeHandle(), ros::Duration max_rate=ros::Duration(0.01))
Constructor.
uint64_t transform_message_count_
void add(const MEvent &evt)
virtual uint32_t getQueueSize()
boost::signals2::connection tf_connection_
The message buffer overflowed, and this message was pushed off the back of the queue, but the reason it was unable to be transformed is unknown.
ros::MessageEvent< M const > MEvent
Definition: exceptions.h:38
std::vector< std::string > target_frames_
The frames we need to be able to transform to before a message is ready.
bool add(const actionlib::TwoIntsGoal &req, actionlib::TwoIntsResult &res)
void incomingMessage(const ros::MessageEvent< M const > &evt)
Callback that happens when we receive a message on the message topic.
~MessageFilter()
Destructor.
std::list< MEvent > L_Event
boost::signals2::connection getBoostConnection() const
void clear()
Clear any messages currently in the queue.
virtual void setQueueSize(uint32_t new_queue_size)
const std::string & getPublisherName() const
volatile bool new_transforms_
Used to skip waiting on new_data_ if new transforms have come in while calling back or transforming d...
void setTargetFrames(const std::vector< std::string > &target_frames)
Set the frames you need to be able to transform to before getting a message callback.
void maxRateTimerCallback(const ros::TimerEvent &)
boost::mutex failure_signal_mutex_
Transformer & tf_
The Transformer used to determine if transformation data is available.
bool new_messages_
Used to skip waiting on new_data_ if new messages have come in while calling back.
ros::Duration max_rate_
std::string last_out_the_back_frame_
message_filters::Connection registerFailureCallback(const FailureCallback &callback)
Register a callback to be called when a message is about to be dropped.
uint32_t message_count_
The number of messages in the list. Used because messages_.size() has linear cost.
MessageFilter(F &f, Transformer &tf, const std::string &target_frame, uint32_t queue_size, ros::NodeHandle nh=ros::NodeHandle(), ros::Duration max_rate=ros::Duration(0.01))
Constructor.
#define TF_MESSAGEFILTER_DEBUG(fmt,...)
uint64_t successful_transform_count_
void disconnectFailure(const message_filters::Connection &c)
bool testMessage(const MEvent &evt)
uint64_t failed_out_the_back_count_
ros::Time last_out_the_back_stamp_
boost::signals2::signal< void(const MConstPtr &, FilterFailureReason)> FailureSignal
boost::function< void(const MConstPtr &, FilterFailureReason)> FailureCallback
message_filters::Connection message_connection_
#define TF_MESSAGEFILTER_WARN(fmt,...)
uint64_t failed_transform_count_
static Time now()
std::string getTargetFramesString()
Get the target frames as a string for debugging.
void setTargetFrame(const std::string &target_frame)
Set the frame you need to be able to transform to before getting a message callback.
uint32_t queue_size_
The maximum number of messages we queue up.
std::string target_frames_string_
The frame_id on the message is empty.
The timestamp on the message is more than the cache length earlier than the newest data in the transf...
uint64_t incoming_message_count_
void setTolerance(const ros::Duration &tolerance)
Set the required tolerance for the notifier to return true.
bool warned_about_unresolved_name_
#define ROSCONSOLE_DEFAULT_NAME
bool warned_about_empty_frame_id_
void add(const MConstPtr &message)
Manually add a message into this filter.
boost::mutex messages_mutex_
The mutex used for locking message list operations.
boost::shared_ptr< M > getMessage() const
A Class which provides coordinate transforms between any two frames in a system.
Definition: tf.h:90
boost::shared_ptr< M const > MConstPtr


tf
Author(s): Tully Foote, Eitan Marder-Eppstein, Wim Meeussen
autogenerated on Mon Jun 10 2019 12:25:26