time_sequencer.h
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2008, Willow Garage, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of the Willow Garage nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 *********************************************************************/
34 
35 #ifndef MESSAGE_FILTERS_TIME_SEQUENCER_H
36 #define MESSAGE_FILTERS_TIME_SEQUENCER_H
37 
38 #include <ros/ros.h>
39 
40 #include "connection.h"
41 #include "simple_filter.h"
42 
43 namespace message_filters
44 {
45 
74 template<class M>
75 class TimeSequencer : public SimpleFilter<M>
76 {
77 public:
80 
89  template<class F>
90  TimeSequencer(F& f, ros::Duration delay, ros::Duration update_rate, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle())
91  : delay_(delay)
92  , update_rate_(update_rate)
93  , queue_size_(queue_size)
94  , nh_(nh)
95  {
96  init();
97  connectInput(f);
98  }
99 
110  TimeSequencer(ros::Duration delay, ros::Duration update_rate, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle())
111  : delay_(delay)
112  , update_rate_(update_rate)
113  , queue_size_(queue_size)
114  , nh_(nh)
115  {
116  init();
117  }
118 
122  template<class F>
123  void connectInput(F& f)
124  {
126  incoming_connection_ = f.registerCallback(typename SimpleFilter<M>::EventCallback(boost::bind(&TimeSequencer::cb, this, _1)));
127  }
128 
130  {
133  }
134 
135  void add(const EventType& evt)
136  {
137  namespace mt = ros::message_traits;
138 
139  boost::mutex::scoped_lock lock(messages_mutex_);
140  if (mt::TimeStamp<M>::value(*evt.getMessage()) < last_time_)
141  {
142  return;
143  }
144 
145  messages_.insert(evt);
146 
147  if (queue_size_ != 0 && messages_.size() > queue_size_)
148  {
149  messages_.erase(*messages_.begin());
150  }
151  }
152 
156  void add(const MConstPtr& msg)
157  {
158  EventType evt(msg);
159  add(evt);
160  }
161 
162 private:
164  {
165  public:
166  bool operator()(const EventType& lhs, const EventType& rhs) const
167  {
168  namespace mt = ros::message_traits;
169  return mt::TimeStamp<M>::value(*lhs.getMessage()) < mt::TimeStamp<M>::value(*rhs.getMessage());
170  }
171  };
172  typedef std::multiset<EventType, MessageSort> S_Message;
173  typedef std::vector<EventType> V_Message;
174 
175  void cb(const EventType& evt)
176  {
177  add(evt);
178  }
179 
180  void dispatch()
181  {
182  namespace mt = ros::message_traits;
183 
184  V_Message to_call;
185 
186  {
187  boost::mutex::scoped_lock lock(messages_mutex_);
188 
189  while (!messages_.empty())
190  {
191  const EventType& e = *messages_.begin();
192  ros::Time stamp = mt::TimeStamp<M>::value(*e.getMessage());
193  if (stamp + delay_ <= ros::Time::now())
194  {
195  last_time_ = stamp;
196  to_call.push_back(e);
197  messages_.erase(messages_.begin());
198  }
199  else
200  {
201  break;
202  }
203  }
204  }
205 
206  {
207  typename V_Message::iterator it = to_call.begin();
208  typename V_Message::iterator end = to_call.end();
209  for (; it != end; ++it)
210  {
211  this->signalMessage(*it);
212  }
213  }
214  }
215 
217  {
218  dispatch();
219  }
220 
221  void init()
222  {
224  }
225 
228  uint32_t queue_size_;
230 
232 
234 
235 
236  S_Message messages_;
237  boost::mutex messages_mutex_;
239 };
240 
241 }
242 
243 #endif
Sequences messages based on the timestamp of their header.
std::vector< EventType > V_Message
void signalMessage(const MConstPtr &msg)
Call all registered callbacks, passing them the specified message.
void cb(const EventType &evt)
boost::shared_ptr< M > getMessage() const
SteadyTimer createSteadyTimer(WallDuration period, void(T::*callback)(const SteadyTimerEvent &), T *obj, bool oneshot=false, bool autostart=true) const
TimeSequencer(F &f, ros::Duration delay, ros::Duration update_rate, uint32_t queue_size, ros::NodeHandle nh=ros::NodeHandle())
Constructor.
TimeSequencer(ros::Duration delay, ros::Duration update_rate, uint32_t queue_size, ros::NodeHandle nh=ros::NodeHandle())
Constructor.
void add(const EventType &evt)
void disconnect()
disconnects this connection
Definition: connection.cpp:52
boost::shared_ptr< M const > MConstPtr
bool operator()(const EventType &lhs, const EventType &rhs) const
void add(const MConstPtr &msg)
Manually add a message to the cache.
void update(const ros::SteadyTimerEvent &)
std::multiset< EventType, MessageSort > S_Message
Convenience base-class for simple filters which output a single message.
Definition: simple_filter.h:60
boost::function< void(const EventType &)> EventCallback
Definition: simple_filter.h:66
static Time now()
void connectInput(F &f)
Connect this filter&#39;s input to another filter&#39;s output.
Encapsulates a connection from one filter to another (or to a user-specified callback) ...
Definition: connection.h:48
ros::MessageEvent< M const > EventType


message_filters
Author(s): Josh Faust, Vijay Pradeep, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:48