trigger_matcher.h
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2009-2010, Willow Garage, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of the Willow Garage nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 *********************************************************************/
34 
35 #ifndef __TIMESTAMP_TOOLS__TRIGGER_MATCHER_H__
36 #define __TIMESTAMP_TOOLS__TRIGGER_MATCHER_H__
37 
38 #include <ros/ros.h>
39 #include <queue>
40 #include <utility>
41 #include <boost/thread.hpp>
42 #include <boost/thread/condition.hpp>
43 #include <std_msgs/Header.h>
44 #include <boost/thread/thread_time.hpp>
45 
46 namespace timestamp_tools
47 {
48 
50 {
51 protected:
52  boost::mutex mutex_;
53  virtual void gotTrigger() = 0;
54 
56  {
57  while (!trig_queue_.empty())
58  {
59  // If the head trigger is after the head data then we don't have a
60  // timestamp for the data and drop it.
61  if (nonCausalHeads(data_time))
62  {
63  if (verbose_)
64  ROS_WARN("TriggerMatcherBase: data arrived before trigger. Discarding data sample.");
65  return DropData;
66  }
67 
68  ros::Time trig_stamp = trig_queue_.front();
69 
70  // If we are not synchronized, then we need to check that the next trigger
71  // time stamp would not be satisfactory. (If no late counts are
72  // allowed then we are always unsynchronized.)
73 
74  //ROS_DEBUG("getTimestampNoblock t=%f, qt=%f, ql=%i, sync = %i, last = %f", t.toSec(), trig_stamp.toSec(), trig_queue_.size(), synchronized_, last_data_stamp_.toSec());
75 
77  {
78  //ROS_DEBUG("Not synchronized...");
79  while (true)
80  {
81  if (trig_queue_.size() < 2)
82  return RetryLater; // Can't do the check now.
83 
84  trig_queue_.pop();
85  if (nonCausalHeads(data_time))
86  break; // Head is non-causal, we have arrived.
87  trig_stamp = trig_queue_.front(); // Skip that trigger event.
88  }
89  synchronized_ = true;
90  late_data_count_ = 0;
91  }
92  else
93  {
94  // If the data is later than the next trigger timestamp, we might
95  // have missed a timestamp, or the data may just have been delayed a
96  // lot. We count it and if it happens too often we assume that a
97  // timestamp was missed and redo the locking process.
98  //ROS_DEBUG("Packet check %f >? %f", last_data_stamp_.toSec(), trig_stamp.toSec());
99  if (last_data_stamp_ > trig_stamp)
100  {
101  //ROS_DEBUG("Late packet...");
103  {
104  if (verbose_)
105  ROS_WARN("TriggerMatcherBase: too many late data packets. Assuming missed trigger. Relocking...");
106  synchronized_ = false;
107  continue;
108  }
109  }
110  else
111  late_data_count_ = 0;
112 
113  trig_queue_.pop();
114  }
115 
116  last_data_stamp_ = data_time;
117  return trig_stamp;
118  }
119 
120  return RetryLater;
121  }
122 
123  virtual void baseReset()
124  {
125  synchronized_ = false;
126  late_data_count_ = 0;
127  trig_queue_ = std::queue<ros::Time>();
128  }
129 
130 private:
131  std::queue<ros::Time> trig_queue_;
132 
134 
136 
138  unsigned int late_data_count_;
141 
142  bool nonCausalHeads(const ros::Time &data_stamp)
143  { // Are the heads non-causal? (Assumes the heads are present.)
144  return trig_queue_.front() > data_stamp;
145  }
146 
147 public:
148  bool verbose_;
149 
150  static const ros::Time DropData;
151  static const ros::Time RetryLater;
152 
154  {
155  return !trig_queue_.empty();
156  }
157 
159  {
160  }
161 
162  void setLateDataCountAllowed(unsigned int v)
163  {
164  late_data_count_allowed_ = v;
165  }
166 
167  void setTrigDelay(double delay)
168  {
169  setTrigDelay(ros::Duration(delay));
170  }
171 
172  void setTrigDelay(const ros::Duration &delay)
173  {
174  trig_delay_ = delay;
175  }
176 
177  TriggerMatcherBase(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length) :
178  trig_delay_(0),
179  last_data_stamp_(ros::TIME_MIN),
180  late_data_count_allowed_(late_data_count_allowed),
181  late_data_count_(0),
182  max_trig_queue_length_(max_trig_queue_length),
183  synchronized_(false),
184  verbose_(false)
185  {
186  }
187 
188  void triggerCallback(double stamp)
189  {
190  triggerCallback(ros::Time(stamp));
191  }
192 
193  void triggerCallback(const std_msgs::HeaderConstPtr &msg)
194  {
195  triggerCallback(msg->stamp);
196  }
197 
198  void triggerCallback(const ros::Time &stamp)
199  {
200  boost::mutex::scoped_lock(mutex_);
201 
202  trig_queue_.push(stamp + trig_delay_);
203  if (trig_queue_.size() > max_trig_queue_length_)
204  {
205  if (verbose_)
206  ROS_WARN("TriggerMatcherBase: trig_queue_ overflow dropping from front.");
207  trig_queue_.pop();
208  }
209  gotTrigger();
210  }
211 };
212 
215 
217 {
218 private:
219  boost::mutex data_source_mutex_;
220  boost::condition_variable got_trigger_condition_;
221 
222 protected:
223  virtual void gotTrigger()
224  {
225  got_trigger_condition_.notify_one();
226  }
227 
228 public:
229  virtual ~TriggerMatcher()
230  {}
231 
232  TriggerMatcher(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length) :
233  TriggerMatcherBase(late_data_count_allowed, max_trig_queue_length)
234  {}
235 
236  void reset()
237  {
238  boost::mutex::scoped_lock lock(mutex_);
239 
240  got_trigger_condition_.notify_all();
241  baseReset();
242  }
243 
245  {
246  boost::mutex::scoped_lock data_lock(data_source_mutex_);
247  boost::mutex::scoped_lock lock(mutex_);
248 
250 
251  if (stamp != RetryLater)
252  return stamp;
253 
254  got_trigger_condition_.wait(lock);
255 
257  }
258 
259  ros::Time getTimestampBlocking(const ros::Time &t, double timeout)
260  {
261  boost::mutex::scoped_lock data_lock(data_source_mutex_);
262  boost::mutex::scoped_lock lock(mutex_);
263 
265 
266  if (stamp != RetryLater)
267  return stamp;
268 
269  got_trigger_condition_.timed_wait(lock, boost::posix_time::microseconds((int) (timeout * 1e6)));
270 
272  }
273 
275  {
276  boost::mutex::scoped_lock data_lock(data_source_mutex_);
277  boost::mutex::scoped_lock lock(mutex_);
278 
279  return getTimestampNoblockPrelocked(data_time);
280  }
281 };
282 
283 template <class C>
285 {
286 public:
287  typedef std::pair<ros::Time, boost::shared_ptr<C const> > DataPair;
288  typedef boost::function<void(const ros::Time &, boost::shared_ptr<C const> &)> MatchCallback;
289 
290 private:
292  std::queue<DataPair> data_queue_;
294 
296  {
297  ROS_ERROR("QueuedTriggerMatcher triggered with no callback set. This is a bug in the code that uses QueuedTriggerMatcher.");
298  }
299 
300 protected:
301  virtual void gotTrigger()
302  {
303  while (!data_queue_.empty())
304  {
305  DataPair &data = data_queue_.front();
306  ros::Time stamp = getTimestampNoblockPrelocked(data.first);
307 
308  if (stamp == DropData)
309  data_queue_.pop();
310  else if (stamp == RetryLater)
311  return;
312  else
313  {
314  match_callback_(stamp, data.second);
315  data_queue_.pop();
316  }
317  }
318  }
319 
320 public:
322  {
323  }
324 
326  {
327  match_callback_ = cb;
328  }
329 
330  QueuedTriggerMatcher(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length, unsigned int max_data_queue_length) :
331  TriggerMatcherBase(late_data_count_allowed, max_trig_queue_length),
332  max_data_queue_length_(max_data_queue_length)
333  {
334  match_callback_ = &QueuedTriggerMatcher::DefaultCallback;
335  }
336 
337  void dataCallback(double stamp, const C &data)
338  {
339  dataCallback(ros::Time(stamp), data);
340  }
341 
342  void dataCallback(const ros::Time &stamp, const C &data)
343  {
345  dataCallback(stamp, ptr);
346  }
347 
348  void dataCallback(double stamp, const boost::shared_ptr<C const> &data)
349  {
350  dataCallback(ros::Time(stamp), data);
351  }
352 
353  void dataCallback(const ros::Time &stamp, const boost::shared_ptr<C const> &data)
354  {
355  dataCallback(DataPair(stamp, data));
356  }
357 
358  void reset()
359  {
360  boost::mutex::scoped_lock lock(mutex_);
361 
362  data_queue_ = std::queue<DataPair>();
363  baseReset();
364  }
365 
366  void dataCallback(const DataPair &pair)
367  {
368  boost::mutex::scoped_lock lock(mutex_);
369 
370  data_queue_.push(pair);
371  if (data_queue_.size() > max_data_queue_length_)
372  {
373  if (verbose_)
374  ROS_WARN("QueuedTriggerMatcher: trig_queue_ overflow dropping from front.");
375  data_queue_.pop();
376  }
377  gotTrigger();
378  }
379 };
380 
381 }
382 
383 #endif // __TIMESTAMP_TOOLS__TRIGGER_MATCHER_H__
ros::Time getTimestampNoblockPrelocked(const ros::Time &data_time)
bool nonCausalHeads(const ros::Time &data_stamp)
void dataCallback(double stamp, const boost::shared_ptr< C const > &data)
void dataCallback(const ros::Time &stamp, const C &data)
const Time TIME_MIN(0, 1)
void dataCallback(const DataPair &pair)
std::queue< ros::Time > trig_queue_
void dataCallback(double stamp, const C &data)
TriggerMatcherBase(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length)
static void DefaultCallback(const ros::Time &t, boost::shared_ptr< C const > &d)
void triggerCallback(const ros::Time &stamp)
#define ROS_WARN(...)
boost::condition_variable got_trigger_condition_
void dataCallback(const ros::Time &stamp, const boost::shared_ptr< C const > &data)
boost::function< void(const ros::Time &, boost::shared_ptr< intconst > &)> MatchCallback
std::pair< ros::Time, boost::shared_ptr< C const > > DataPair
void triggerCallback(const std_msgs::HeaderConstPtr &msg)
ros::Time getTimestampBlocking(const ros::Time &t, double timeout)
QueuedTriggerMatcher(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length, unsigned int max_data_queue_length)
ROSTIME_DECL const Time TIME_MAX
ros::Time getTimestampBlocking(const ros::Time &t)
void setTrigDelay(const ros::Duration &delay)
#define ROS_ERROR(...)
TriggerMatcher(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length)
void setMatchCallback(MatchCallback &cb)
void setLateDataCountAllowed(unsigned int v)
ros::Time getTimestampNoblock(const ros::Time &data_time)


timestamp_tools
Author(s): Blaise Gassend
autogenerated on Tue Mar 23 2021 02:23:30