message_filter.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2010, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of the Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
32 #ifndef TF2_ROS_MESSAGE_FILTER_H
33 #define TF2_ROS_MESSAGE_FILTER_H
34 
35 #include <tf2/buffer_core.h>
36 
37 #include <string>
38 #include <list>
39 #include <vector>
40 #include <boost/function.hpp>
41 #include <boost/bind.hpp>
42 #include <boost/shared_ptr.hpp>
43 #include <boost/thread.hpp>
44 
47 
48 #include <ros/node_handle.h>
50 #include <ros/init.h>
51 
52 #define TF2_ROS_MESSAGEFILTER_DEBUG(fmt, ...) \
53  ROS_DEBUG_NAMED("message_filter", std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), getTargetFramesString().c_str(), __VA_ARGS__)
54 
55 #define TF2_ROS_MESSAGEFILTER_WARN(fmt, ...) \
56  ROS_WARN_NAMED("message_filter", std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), getTargetFramesString().c_str(), __VA_ARGS__)
57 
58 namespace tf2_ros
59 {
60 
61 namespace filter_failure_reasons
62 {
64 {
71 };
72 }
74 
76 {
77 public:
78  typedef std::vector<std::string> V_string;
79 
80  virtual ~MessageFilterBase(){}
81  virtual void clear() = 0;
82  virtual void setTargetFrame(const std::string& target_frame) = 0;
83  virtual void setTargetFrames(const V_string& target_frames) = 0;
84  virtual void setTolerance(const ros::Duration& tolerance) = 0;
85 };
86 
103 template<class M>
105 {
106 public:
109  typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
110  typedef boost::signals2::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal;
111 
112  // If you hit this assert your message does not have a header, or does not have the HasHeader trait defined for it
113  // Actually, we need to check that the message has a header, or that it
114  // has the FrameId and Stamp traits. However I don't know how to do that
115  // so simply commenting out for now.
116  //ROS_STATIC_ASSERT(ros::message_traits::HasHeader<M>::value);
117 
126  MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
127  : bc_(bc)
128  , queue_size_(queue_size)
129  , callback_queue_(nh.getCallbackQueue())
130  {
131  init();
132 
133  setTargetFrame(target_frame);
134  }
135 
145  template<class F>
146  MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh)
147  : bc_(bc)
148  , queue_size_(queue_size)
149  , callback_queue_(nh.getCallbackQueue())
150  {
151  init();
152 
153  setTargetFrame(target_frame);
154 
155  connectInput(f);
156  }
157 
168  MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
169  : bc_(bc)
170  , queue_size_(queue_size)
171  , callback_queue_(cbqueue)
172  {
173  init();
174 
175  setTargetFrame(target_frame);
176  }
177 
189  template<class F>
190  MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue)
191  : bc_(bc)
192  , queue_size_(queue_size)
193  , callback_queue_(cbqueue)
194  {
195  init();
196 
197  setTargetFrame(target_frame);
198 
199  connectInput(f);
200  }
201 
205  template<class F>
206  void connectInput(F& f)
207  {
208  message_connection_.disconnect();
209  message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
210  }
211 
216  {
217  message_connection_.disconnect();
218 
219  clear();
220 
221  TF2_ROS_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
222  (long long unsigned int)successful_transform_count_,
223  (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_,
224  (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
225 
226  }
227 
231  void setTargetFrame(const std::string& target_frame)
232  {
233  V_string frames;
234  frames.push_back(target_frame);
235  setTargetFrames(frames);
236  }
237 
241  void setTargetFrames(const V_string& target_frames)
242  {
243  boost::mutex::scoped_lock frames_lock(target_frames_mutex_);
244 
245  target_frames_.resize(target_frames.size());
246  std::transform(target_frames.begin(), target_frames.end(), target_frames_.begin(), this->stripSlash);
247  expected_success_count_ = target_frames_.size() * (time_tolerance_.isZero() ? 1 : 2);
248 
249  std::stringstream ss;
250  for (V_string::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
251  {
252  ss << *it << " ";
253  }
254  target_frames_string_ = ss.str();
255  }
259  std::string getTargetFramesString()
260  {
261  boost::mutex::scoped_lock lock(target_frames_mutex_);
262  return target_frames_string_;
263  };
264 
268  void setTolerance(const ros::Duration& tolerance)
269  {
270  boost::mutex::scoped_lock lock(target_frames_mutex_);
271  time_tolerance_ = tolerance;
272  expected_success_count_ = target_frames_.size() * (time_tolerance_.isZero() ? 1 : 2);
273  }
274 
278  void clear()
279  {
280  boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_);
281 
282  TF2_ROS_MESSAGEFILTER_DEBUG("%s", "Cleared");
283 
284  bc_.removeTransformableCallback(callback_handle_);
285  callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
286 
287  messages_.clear();
288  message_count_ = 0;
289 
290  warned_about_empty_frame_id_ = false;
291  }
292 
293  void add(const MEvent& evt)
294  {
295  if (target_frames_.empty())
296  {
297  return;
298  }
299 
300  namespace mt = ros::message_traits;
301  const MConstPtr& message = evt.getMessage();
302  std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
303  ros::Time stamp = mt::TimeStamp<M>::value(*message);
304 
305  if (frame_id.empty())
306  {
307  messageDropped(evt, filter_failure_reasons::EmptyFrameID);
308  return;
309  }
310 
311  // iterate through the target frames and add requests for each of them
312  MessageInfo info;
313  info.handles.reserve(expected_success_count_);
314  {
315  V_string target_frames_copy;
316  // Copy target_frames_ to avoid deadlock from #79
317  {
318  boost::mutex::scoped_lock frames_lock(target_frames_mutex_);
319  target_frames_copy = target_frames_;
320  }
321 
322  V_string::iterator it = target_frames_copy.begin();
323  V_string::iterator end = target_frames_copy.end();
324  for (; it != end; ++it)
325  {
326  const std::string& target_frame = *it;
327  tf2::TransformableRequestHandle handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp);
328  if (handle == 0xffffffffffffffffULL) // never transformable
329  {
330  messageDropped(evt, filter_failure_reasons::OutTheBack);
331  return;
332  }
333  else if (handle == 0)
334  {
335  ++info.success_count;
336  }
337  else
338  {
339  info.handles.push_back(handle);
340  }
341 
342  if (!time_tolerance_.isZero())
343  {
344  handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp + time_tolerance_);
345  if (handle == 0xffffffffffffffffULL) // never transformable
346  {
347  messageDropped(evt, filter_failure_reasons::OutTheBack);
348  return;
349  }
350  else if (handle == 0)
351  {
352  ++info.success_count;
353  }
354  else
355  {
356  info.handles.push_back(handle);
357  }
358  }
359  }
360  }
361 
362 
363  // We can transform already
364  if (info.success_count == expected_success_count_)
365  {
366  messageReady(evt);
367  }
368  else
369  {
370  boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_);
371  // If this message is about to push us past our queue size, erase the oldest message
372  if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
373  {
374  ++dropped_message_count_;
375  const MessageInfo& front = messages_.front();
376  TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_,
377  (mt::FrameId<M>::value(*front.event.getMessage())).c_str(), mt::TimeStamp<M>::value(*front.event.getMessage()).toSec());
378 
379  V_TransformableRequestHandle::const_iterator it = front.handles.begin();
380  V_TransformableRequestHandle::const_iterator end = front.handles.end();
381 
382  for (; it != end; ++it)
383  {
384  bc_.cancelTransformableRequest(*it);
385  }
386 
387  messageDropped(front.event, filter_failure_reasons::Unknown);
388  messages_.pop_front();
389  --message_count_;
390  }
391 
392  // Add the message to our list
393  info.event = evt;
394  messages_.push_back(info);
395  ++message_count_;
396  }
397 
398  TF2_ROS_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);
399 
400  ++incoming_message_count_;
401  }
402 
408  void add(const MConstPtr& message)
409  {
410  boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
411  (*header)["callerid"] = "unknown";
413  ros::Time t(n.sec, n.nsec);
414  add(MEvent(message, header, t));
415  }
416 
421  message_filters::Connection registerFailureCallback(const FailureCallback& callback)
422  {
423  boost::mutex::scoped_lock lock(failure_signal_mutex_);
424  return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
425  }
426 
427  virtual void setQueueSize( uint32_t new_queue_size )
428  {
429  queue_size_ = new_queue_size;
430  }
431 
432  virtual uint32_t getQueueSize()
433  {
434  return queue_size_;
435  }
436 
437 
438 private:
439 
440  void init()
441  {
442  message_count_ = 0;
443  successful_transform_count_ = 0;
444  failed_out_the_back_count_ = 0;
445  transform_message_count_ = 0;
446  incoming_message_count_ = 0;
447  dropped_message_count_ = 0;
448  time_tolerance_ = ros::Duration(0.0);
449  warned_about_empty_frame_id_ = false;
450  expected_success_count_ = 1;
451 
452  callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5));
453  }
454 
455  void transformable(tf2::TransformableRequestHandle request_handle, const std::string& target_frame, const std::string& source_frame,
456  ros::Time time, tf2::TransformableResult result)
457  {
458  namespace mt = ros::message_traits;
459 
460  boost::upgrade_lock< boost::shared_mutex > lock(messages_mutex_);
461 
462  // find the message this request is associated with
463  typename L_MessageInfo::iterator msg_it = messages_.begin();
464  typename L_MessageInfo::iterator msg_end = messages_.end();
465  for (; msg_it != msg_end; ++msg_it)
466  {
467  MessageInfo& info = *msg_it;
468  V_TransformableRequestHandle::const_iterator handle_it = std::find(info.handles.begin(), info.handles.end(), request_handle);
469  if (handle_it != info.handles.end())
470  {
471  // found msg_it
472  ++info.success_count;
473  break;
474  }
475  }
476 
477  if (msg_it == msg_end)
478  {
479  return;
480  }
481 
482  const MessageInfo& info = *msg_it;
483  if (info.success_count < expected_success_count_)
484  {
485  return;
486  }
487 
488  bool can_transform = true;
489  const MConstPtr& message = info.event.getMessage();
490  std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
491  ros::Time stamp = mt::TimeStamp<M>::value(*message);
492 
493  if (result == tf2::TransformAvailable)
494  {
495  boost::mutex::scoped_lock frames_lock(target_frames_mutex_);
496  // make sure we can still perform all the necessary transforms
497  typename V_string::iterator it = target_frames_.begin();
498  typename V_string::iterator end = target_frames_.end();
499  for (; it != end; ++it)
500  {
501  const std::string& target = *it;
502  if (!bc_.canTransform(target, frame_id, stamp))
503  {
504  can_transform = false;
505  break;
506  }
507 
508  if (!time_tolerance_.isZero())
509  {
510  if (!bc_.canTransform(target, frame_id, stamp + time_tolerance_))
511  {
512  can_transform = false;
513  break;
514  }
515  }
516  }
517  }
518  else
519  {
520  can_transform = false;
521  }
522 
523  // We will be mutating messages now, require unique lock
524  boost::upgrade_to_unique_lock< boost::shared_mutex > uniqueLock(lock);
525  if (can_transform)
526  {
527  TF2_ROS_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
528 
529  ++successful_transform_count_;
530 
531  messageReady(info.event);
532 
533  }
534  else
535  {
536  ++dropped_message_count_;
537 
538  TF2_ROS_MESSAGEFILTER_DEBUG("Discarding message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1);
539  messageDropped(info.event, filter_failure_reasons::Unknown);
540  }
541 
542  messages_.erase(msg_it);
543  --message_count_;
544  }
545 
550  {
551  add(evt);
552  }
553 
555  {
556  if (next_failure_warning_.isZero())
557  {
558  next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(15);
559  }
560 
561  if (ros::WallTime::now() >= next_failure_warning_)
562  {
563  if (incoming_message_count_ - message_count_ == 0)
564  {
565  return;
566  }
567 
568  double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
569  if (dropped_pct > 0.95)
570  {
571  TF2_ROS_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
572  next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(60);
573 
574  if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
575  {
576  TF2_ROS_MESSAGEFILTER_WARN(" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
577  }
578  }
579  }
580  }
581 
583  {
584  CBQueueCallback(MessageFilter* filter, const MEvent& event, bool success, FilterFailureReason reason)
585  : event_(event)
586  , filter_(filter)
587  , reason_(reason)
588  , success_(success)
589  {}
590 
591 
592  virtual CallResult call()
593  {
594  if (success_)
595  {
596  filter_->signalMessage(event_);
597  }
598  else
599  {
600  filter_->signalFailure(event_, reason_);
601  }
602 
603  return Success;
604  }
605 
606  private:
607  MEvent event_;
609  FilterFailureReason reason_;
610  bool success_;
611  };
612 
613  void messageDropped(const MEvent& evt, FilterFailureReason reason)
614  {
615  if (callback_queue_)
616  {
617  ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, false, reason));
618  callback_queue_->addCallback(cb, (uint64_t)this);
619  }
620  else
621  {
622  signalFailure(evt, reason);
623  }
624  }
625 
626  void messageReady(const MEvent& evt)
627  {
628  if (callback_queue_)
629  {
631  callback_queue_->addCallback(cb, (uint64_t)this);
632  }
633  else
634  {
635  this->signalMessage(evt);
636  }
637  }
638 
640  {
641  boost::mutex::scoped_lock lock(failure_signal_mutex_);
642  c.getBoostConnection().disconnect();
643  }
644 
645  void signalFailure(const MEvent& evt, FilterFailureReason reason)
646  {
647  boost::mutex::scoped_lock lock(failure_signal_mutex_);
648  failure_signal_(evt.getMessage(), reason);
649  }
650 
651  static
652  std::string stripSlash(const std::string& in)
653  {
654  if ( !in.empty() && (in[0] == '/'))
655  {
656  std::string out = in;
657  out.erase(0, 1);
658  return out;
659  }
660  return in;
661  }
662 
666  boost::mutex target_frames_mutex_;
667  uint32_t queue_size_;
669 
670  typedef std::vector<tf2::TransformableRequestHandle> V_TransformableRequestHandle;
671  struct MessageInfo
672  {
674  : success_count(0)
675  {}
676 
677  MEvent event;
678  V_TransformableRequestHandle handles;
679  uint32_t success_count;
680  };
681  typedef std::list<MessageInfo> L_MessageInfo;
682  L_MessageInfo messages_;
683  uint32_t message_count_;
684  boost::shared_mutex messages_mutex_;
686 
688 
694 
697 
699 
701 
703 
704  FailureSignal failure_signal_;
705  boost::mutex failure_signal_mutex_;
706 
708 };
709 
710 } // namespace tf2
711 
712 #endif
void clear()
Clear any messages currently in the queue.
std::list< MessageInfo > L_MessageInfo
uint32_t queue_size_
The maximum number of messages we queue up.
void add(const MConstPtr &message)
Manually add a message into this filter.
Follows the patterns set by the message_filters package to implement a filter which only passes messa...
FailureSignal failure_signal_
~MessageFilter()
Destructor.
uint32_t TransformableCallbackHandle
The message buffer overflowed, and this message was pushed off the back of the queue, but the reason it was unable to be transformed is unknown.
boost::function< void(const MConstPtr &, FilterFailureReason)> FailureCallback
The frame_id on the message is empty.
void messageReady(const MEvent &evt)
void setTargetFrame(const std::string &target_frame)
Set the frame you need to be able to transform to before getting a message callback.
std::string getTargetFramesString()
Get the target frames as a string for debugging.
tf2::BufferCore & bc_
The Transformer used to determine if transformation data is available.
void messageDropped(const MEvent &evt, FilterFailureReason reason)
ros::WallTime next_failure_warning_
std::vector< tf2::TransformableRequestHandle > V_TransformableRequestHandle
std::string last_out_the_back_frame_
boost::shared_mutex messages_mutex_
The mutex used for locking message list operations.
bool add(const actionlib::TwoIntsGoal &req, actionlib::TwoIntsResult &res)
message_filters::Connection registerFailureCallback(const FailureCallback &callback)
Register a callback to be called when a message is about to be dropped.
void signalFailure(const MEvent &evt, FilterFailureReason reason)
TransformableResult
TransformAvailable
V_TransformableRequestHandle handles
MessageFilter(F &f, tf2::BufferCore &bc, const std::string &target_frame, uint32_t queue_size, ros::CallbackQueueInterface *cbqueue)
Constructor.
boost::signals2::connection getBoostConnection() const
#define TF2_ROS_MESSAGEFILTER_WARN(fmt,...)
void add(const MEvent &evt)
MessageFilter(tf2::BufferCore &bc, const std::string &target_frame, uint32_t queue_size, ros::CallbackQueueInterface *cbqueue)
Constructor.
boost::mutex failure_signal_mutex_
void setTargetFrames(const V_string &target_frames)
Set the frames you need to be able to transform to before getting a message callback.
void setTolerance(const ros::Duration &tolerance)
Set the required tolerance for the notifier to return true.
uint64_t successful_transform_count_
MessageFilter(F &f, tf2::BufferCore &bc, const std::string &target_frame, uint32_t queue_size, const ros::NodeHandle &nh)
Constructor.
tf2::TransformableCallbackHandle callback_handle_
virtual void setQueueSize(uint32_t new_queue_size)
CBQueueCallback(MessageFilter *filter, const MEvent &event, bool success, FilterFailureReason reason)
std::string stripSlash(const std::string &in)
message_filters::Connection message_connection_
void disconnectFailure(const message_filters::Connection &c)
ros::CallbackQueueInterface * callback_queue_
MessageFilter(tf2::BufferCore &bc, const std::string &target_frame, uint32_t queue_size, const ros::NodeHandle &nh)
Constructor.
The timestamp on the message is more than the cache length earlier than the newest data in the transf...
static WallTime now()
Definition: buffer.h:42
void incomingMessage(const ros::MessageEvent< M const > &evt)
Callback that happens when we receive a message on the message topic.
void transformable(tf2::TransformableRequestHandle request_handle, const std::string &target_frame, const std::string &source_frame, ros::Time time, tf2::TransformableResult result)
ros::Time last_out_the_back_stamp_
ros::Duration time_tolerance_
Provide additional tolerance on time for messages which are stamped but can have associated duration...
uint32_t message_count_
The number of messages in the list. Used because <container>.size() may have linear cost...
static std::string stripSlash(const std::string &in)
ros::MessageEvent< M const > MEvent
V_string target_frames_
The frames we need to be able to transform to before a message is ready.
std::string target_frames_string_
boost::shared_ptr< M const > MConstPtr
void connectInput(F &f)
Connect this filter&#39;s input to another filter&#39;s output. If this filter is already connected...
std::vector< std::string > V_string
boost::mutex target_frames_mutex_
A mutex to protect access to the target_frames_ list and target_frames_string.
boost::signals2::signal< void(const MConstPtr &, FilterFailureReason)> FailureSignal
#define TF2_ROS_MESSAGEFILTER_DEBUG(fmt,...)
uint64_t failed_out_the_back_count_
#define ROSCONSOLE_DEFAULT_NAME
boost::shared_ptr< M > getMessage() const
virtual uint32_t getQueueSize()
uint64_t TransformableRequestHandle


tf2_ros
Author(s): Eitan Marder-Eppstein, Wim Meeussen
autogenerated on Fri Oct 16 2020 19:08:54