approximate_time.h
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2009, Willow Garage, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of the Willow Garage nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 *********************************************************************/
34 
35 #ifndef MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
36 #define MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
37 
42 
43 #include <boost/tuple/tuple.hpp>
44 #include <boost/shared_ptr.hpp>
45 #include <boost/function.hpp>
46 #include <boost/thread/mutex.hpp>
47 
48 #include <boost/bind.hpp>
49 #include <boost/type_traits/is_same.hpp>
50 #include <boost/noncopyable.hpp>
51 #include <boost/mpl/or.hpp>
52 #include <boost/mpl/at.hpp>
53 #include <boost/mpl/vector.hpp>
54 
55 #include <ros/assert.h>
56 #include <ros/message_traits.h>
57 #include <ros/message_event.h>
58 
59 #include <deque>
60 #include <vector>
61 #include <string>
62 
63 namespace message_filters
64 {
65 namespace sync_policies
66 {
67 
68 namespace mpl = boost::mpl;
69 
70 template<typename M0, typename M1, typename M2 = NullType, typename M3 = NullType, typename M4 = NullType,
71  typename M5 = NullType, typename M6 = NullType, typename M7 = NullType, typename M8 = NullType>
72 struct ApproximateTime : public PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8>
73 {
74  typedef Synchronizer<ApproximateTime> Sync;
75  typedef PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8> Super;
76  typedef typename Super::Messages Messages;
77  typedef typename Super::Signal Signal;
78  typedef typename Super::Events Events;
79  typedef typename Super::RealTypeCount RealTypeCount;
80  typedef typename Super::M0Event M0Event;
81  typedef typename Super::M1Event M1Event;
82  typedef typename Super::M2Event M2Event;
83  typedef typename Super::M3Event M3Event;
84  typedef typename Super::M4Event M4Event;
85  typedef typename Super::M5Event M5Event;
86  typedef typename Super::M6Event M6Event;
87  typedef typename Super::M7Event M7Event;
88  typedef typename Super::M8Event M8Event;
89  typedef std::deque<M0Event> M0Deque;
90  typedef std::deque<M1Event> M1Deque;
91  typedef std::deque<M2Event> M2Deque;
92  typedef std::deque<M3Event> M3Deque;
93  typedef std::deque<M4Event> M4Deque;
94  typedef std::deque<M5Event> M5Deque;
95  typedef std::deque<M6Event> M6Deque;
96  typedef std::deque<M7Event> M7Deque;
97  typedef std::deque<M8Event> M8Deque;
98  typedef std::vector<M0Event> M0Vector;
99  typedef std::vector<M1Event> M1Vector;
100  typedef std::vector<M2Event> M2Vector;
101  typedef std::vector<M3Event> M3Vector;
102  typedef std::vector<M4Event> M4Vector;
103  typedef std::vector<M5Event> M5Vector;
104  typedef std::vector<M6Event> M6Vector;
105  typedef std::vector<M7Event> M7Vector;
106  typedef std::vector<M8Event> M8Vector;
107  typedef boost::tuple<M0Event, M1Event, M2Event, M3Event, M4Event, M5Event, M6Event, M7Event, M8Event> Tuple;
108  typedef boost::tuple<M0Deque, M1Deque, M2Deque, M3Deque, M4Deque, M5Deque, M6Deque, M7Deque, M8Deque> DequeTuple;
109  typedef boost::tuple<M0Vector, M1Vector, M2Vector, M3Vector, M4Vector, M5Vector, M6Vector, M7Vector, M8Vector> VectorTuple;
110 
111  ApproximateTime(uint32_t queue_size)
112  : parent_(0)
113  , queue_size_(queue_size)
114  , enable_reset_(false)
115  , num_reset_deques_(0)
117  , pivot_(NO_PIVOT)
119  , age_penalty_(0.1)
120  , has_dropped_messages_(9, false)
121  , inter_message_lower_bounds_(9, ros::Duration(0))
123  , last_stamps_(9, ros::Time(0, 0))
124  {
125  ROS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
126  }
127 
129  {
130  *this = e;
131  }
132 
134  {
135  parent_ = rhs.parent_;
136  queue_size_ = rhs.queue_size_;
137  num_non_empty_deques_ = rhs.num_non_empty_deques_;
138  pivot_time_ = rhs.pivot_time_;
139  pivot_ = rhs.pivot_;
140  max_interval_duration_ = rhs.max_interval_duration_;
141  age_penalty_ = rhs.age_penalty_;
142  candidate_start_ = rhs.candidate_start_;
143  candidate_end_ = rhs.candidate_end_;
144  deques_ = rhs.deques_;
145  past_ = rhs.past_;
146  has_dropped_messages_ = rhs.has_dropped_messages_;
147  inter_message_lower_bounds_ = rhs.inter_message_lower_bounds_;
148  warned_about_incorrect_bound_ = rhs.warned_about_incorrect_bound_;
149  last_stamps_ = rhs.last_stamps_;
150  enable_reset_ = rhs.enable_reset_;
151  num_reset_deques_ = rhs.num_reset_deques_;
152 
153  return *this;
154  }
155 
156  void initParent(Sync* parent)
157  {
158  parent_ = parent;
159  }
160 
161  template<int i>
163  {
164  namespace mt = ros::message_traits;
165 
166  std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
167  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
168  ROS_ASSERT(!deque.empty());
169  const typename mpl::at_c<Messages, i>::type &msg = *(deque.back()).getMessage();
170  ros::Time msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(msg);
171  ros::Time previous_msg_time;
172  bool check_ok = true;
173  if (deque.size() == (size_t) 1)
174  {
175  if (v.empty())
176  {
177  // We have already published (or have never received) the previous message, we cannot check the bound
178  return check_ok;
179  }
180  const typename mpl::at_c<Messages, i>::type &previous_msg = *(v.back()).getMessage();
181  previous_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(previous_msg);
182  }
183  else
184  {
185  // There are at least 2 elements in the deque. Check that the gap respects the bound if it was provided.
186  const typename mpl::at_c<Messages, i>::type &previous_msg = *(deque[deque.size()-2]).getMessage();
187  previous_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(previous_msg);
188  }
189  if (msg_time < previous_msg_time)
190  {
192  ROS_WARN_STREAM("Messages of type " << i << " arrived out of order (will print only once)");
194  check_ok = false;
195  }
196  else if ((msg_time - previous_msg_time) < inter_message_lower_bounds_[i])
197  {
199  ROS_WARN_STREAM("Messages of type " << i << " arrived closer (" << (msg_time - previous_msg_time)
200  << ") than the lower bound you provided (" << inter_message_lower_bounds_[i]
201  << ") (will print only once)");
203  check_ok = false;
204  }
205  return check_ok;
206  }
207 
208 
209  template<int i>
210  void add(const typename mpl::at_c<Events, i>::type& evt)
211  {
212  boost::mutex::scoped_lock lock(data_mutex_);
213 
214  // check if time jumped back in simulation time
215  ros::Time now = evt.getReceiptTime();
217  {
218  if (now < last_stamps_[i])
219  {
221  if (num_reset_deques_ == 1)
222  {
223  ROS_WARN("Detected jump back in time. Clearing message filter queues");
224  }
225  clearDeque<i>();
226  if (num_reset_deques_ >= RealTypeCount::value)
227  {
228  num_reset_deques_ = 0;
229  }
230  }
231  }
232  last_stamps_[i] = now;
233 
234  std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
235  deque.push_back(evt);
236  if (deque.size() == (size_t)1) {
237  // We have just added the first message, so it was empty before
239  if (num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
240  {
241  // All deques have messages
242  process();
243  }
244  }
245  else
246  {
247  if (!checkInterMessageBound<i>())
249  {
250  dequeDeleteFront<i>();
251  }
252  }
253  // Check whether we have more messages than allowed in the queue.
254  // Note that during the above call to process(), queue i may contain queue_size_+1 messages.
255  std::vector<typename mpl::at_c<Events, i>::type>& past = boost::get<i>(past_);
256  if (deque.size() + past.size() > queue_size_)
257  {
258  // Cancel ongoing candidate search, if any:
259  num_non_empty_deques_ = 0; // We will recompute it from scratch
260  recover<0>();
261  recover<1>();
262  recover<2>();
263  recover<3>();
264  recover<4>();
265  recover<5>();
266  recover<6>();
267  recover<7>();
268  recover<8>();
269  // Drop the oldest message in the offending topic
270  ROS_ASSERT(!deque.empty());
271  deque.pop_front();
272  has_dropped_messages_[i] = true;
273  if (pivot_ != NO_PIVOT)
274  {
275  // The candidate is no longer valid. Destroy it.
276  candidate_ = Tuple();
277  pivot_ = NO_PIVOT;
278  // There might still be enough messages to create a new candidate:
279  process();
280  }
281  }
282  }
283 
284  void setAgePenalty(double age_penalty)
285  {
286  // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
287  ROS_ASSERT(age_penalty >= 0);
288  age_penalty_ = age_penalty;
289  }
290 
291  void setInterMessageLowerBound(int i, ros::Duration lower_bound) {
292  ROS_ASSERT(lower_bound >= ros::Duration(0,0));
293  inter_message_lower_bounds_[i] = lower_bound;
294  }
295 
296  void setInterMessageLowerBound(ros::Duration lower_bound) {
297  ROS_ASSERT(lower_bound >= ros::Duration(0,0));
298  for (size_t i = 0; i < inter_message_lower_bounds_.size(); i++)
299  {
300  inter_message_lower_bounds_[i] = lower_bound;
301  }
302  }
303 
304  void setMaxIntervalDuration(ros::Duration max_interval_duration) {
305  ROS_ASSERT(max_interval_duration >= ros::Duration(0,0));
306  max_interval_duration_ = max_interval_duration;
307  }
308 
309  void setReset(const bool reset)
310  {
311  // Set this true to reset queue on ROS time jumped back
312  enable_reset_ = reset;
313  }
314 
315 private:
316  template<int i>
317  void clearDeque()
318  {
320  recover<0>();
321  recover<1>();
322  recover<2>();
323  recover<3>();
324  recover<4>();
325  recover<5>();
326  recover<6>();
327  recover<7>();
328  recover<8>();
329  std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
330  if (!q.empty())
331  {
333  }
334  q.clear();
336  candidate_ = Tuple();
337  pivot_ = NO_PIVOT;
338  }
339 
340  // Assumes that deque number <index> is non empty
341  template<int i>
342  void dequeDeleteFront()
343  {
344  std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
345  ROS_ASSERT(!deque.empty());
346  deque.pop_front();
347  if (deque.empty())
348  {
350  }
351  }
352 
353  // Assumes that deque number <index> is non empty
354  void dequeDeleteFront(uint32_t index)
355  {
356  switch (index)
357  {
358  case 0:
359  dequeDeleteFront<0>();
360  break;
361  case 1:
362  dequeDeleteFront<1>();
363  break;
364  case 2:
365  dequeDeleteFront<2>();
366  break;
367  case 3:
368  dequeDeleteFront<3>();
369  break;
370  case 4:
371  dequeDeleteFront<4>();
372  break;
373  case 5:
374  dequeDeleteFront<5>();
375  break;
376  case 6:
377  dequeDeleteFront<6>();
378  break;
379  case 7:
380  dequeDeleteFront<7>();
381  break;
382  case 8:
383  dequeDeleteFront<8>();
384  break;
385  default:
386  ROS_BREAK();
387  }
388  }
389 
390  // Assumes that deque number <index> is non empty
391  template<int i>
393  {
394  std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
395  std::vector<typename mpl::at_c<Events, i>::type>& vector = boost::get<i>(past_);
396  ROS_ASSERT(!deque.empty());
397  vector.push_back(deque.front());
398  deque.pop_front();
399  if (deque.empty())
400  {
402  }
403  }
404  // Assumes that deque number <index> is non empty
405  void dequeMoveFrontToPast(uint32_t index)
406  {
407  switch (index)
408  {
409  case 0:
410  dequeMoveFrontToPast<0>();
411  break;
412  case 1:
413  dequeMoveFrontToPast<1>();
414  break;
415  case 2:
416  dequeMoveFrontToPast<2>();
417  break;
418  case 3:
419  dequeMoveFrontToPast<3>();
420  break;
421  case 4:
422  dequeMoveFrontToPast<4>();
423  break;
424  case 5:
425  dequeMoveFrontToPast<5>();
426  break;
427  case 6:
428  dequeMoveFrontToPast<6>();
429  break;
430  case 7:
431  dequeMoveFrontToPast<7>();
432  break;
433  case 8:
434  dequeMoveFrontToPast<8>();
435  break;
436  default:
437  ROS_BREAK();
438  }
439  }
440 
441  void makeCandidate()
442  {
443  //printf("Creating candidate\n");
444  // Create candidate tuple
445  candidate_ = Tuple(); // Discards old one if any
446  boost::get<0>(candidate_) = boost::get<0>(deques_).front();
447  boost::get<1>(candidate_) = boost::get<1>(deques_).front();
448  if (RealTypeCount::value > 2)
449  {
450  boost::get<2>(candidate_) = boost::get<2>(deques_).front();
451  if (RealTypeCount::value > 3)
452  {
453  boost::get<3>(candidate_) = boost::get<3>(deques_).front();
454  if (RealTypeCount::value > 4)
455  {
456  boost::get<4>(candidate_) = boost::get<4>(deques_).front();
457  if (RealTypeCount::value > 5)
458  {
459  boost::get<5>(candidate_) = boost::get<5>(deques_).front();
460  if (RealTypeCount::value > 6)
461  {
462  boost::get<6>(candidate_) = boost::get<6>(deques_).front();
463  if (RealTypeCount::value > 7)
464  {
465  boost::get<7>(candidate_) = boost::get<7>(deques_).front();
466  if (RealTypeCount::value > 8)
467  {
468  boost::get<8>(candidate_) = boost::get<8>(deques_).front();
469  }
470  }
471  }
472  }
473  }
474  }
475  }
476  // Delete all past messages, since we have found a better candidate
477  boost::get<0>(past_).clear();
478  boost::get<1>(past_).clear();
479  boost::get<2>(past_).clear();
480  boost::get<3>(past_).clear();
481  boost::get<4>(past_).clear();
482  boost::get<5>(past_).clear();
483  boost::get<6>(past_).clear();
484  boost::get<7>(past_).clear();
485  boost::get<8>(past_).clear();
486  //printf("Candidate created\n");
487  }
488 
489 
490  // ASSUMES: num_messages <= past_[i].size()
491  template<int i>
492  void recover(size_t num_messages)
493  {
494  if (i >= RealTypeCount::value)
495  {
496  return;
497  }
498 
499  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
500  std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
501  ROS_ASSERT(num_messages <= v.size());
502  while (num_messages > 0)
503  {
504  q.push_front(v.back());
505  v.pop_back();
506  num_messages--;
507  }
508 
509  if (!q.empty())
510  {
512  }
513  }
514 
515 
516  template<int i>
517  void recover()
518  {
519  if (i >= RealTypeCount::value)
520  {
521  return;
522  }
523 
524  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
525  std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
526  while (!v.empty())
527  {
528  q.push_front(v.back());
529  v.pop_back();
530  }
531 
532  if (!q.empty())
533  {
535  }
536  }
537 
538 
539  template<int i>
540  void recoverAndDelete()
541  {
542  if (i >= RealTypeCount::value)
543  {
544  return;
545  }
546 
547  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
548  std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
549  while (!v.empty())
550  {
551  q.push_front(v.back());
552  v.pop_back();
553  }
554 
555  ROS_ASSERT(!q.empty());
556 
557  q.pop_front();
558  if (!q.empty())
559  {
561  }
562  }
563 
564  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
565  void publishCandidate()
566  {
567  //printf("Publishing candidate\n");
568  // Publish
569  parent_->signal(boost::get<0>(candidate_), boost::get<1>(candidate_), boost::get<2>(candidate_), boost::get<3>(candidate_),
570  boost::get<4>(candidate_), boost::get<5>(candidate_), boost::get<6>(candidate_), boost::get<7>(candidate_),
571  boost::get<8>(candidate_));
572  // Delete this candidate
573  candidate_ = Tuple();
574  pivot_ = NO_PIVOT;
575 
576  // Recover hidden messages, and delete the ones corresponding to the candidate
577  num_non_empty_deques_ = 0; // We will recompute it from scratch
578  recoverAndDelete<0>();
579  recoverAndDelete<1>();
580  recoverAndDelete<2>();
581  recoverAndDelete<3>();
582  recoverAndDelete<4>();
583  recoverAndDelete<5>();
584  recoverAndDelete<6>();
585  recoverAndDelete<7>();
586  recoverAndDelete<8>();
587  }
588 
589  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
590  // Returns: the oldest message on the deques
591  void getCandidateStart(uint32_t &start_index, ros::Time &start_time)
592  {
593  return getCandidateBoundary(start_index, start_time, false);
594  }
595 
596  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
597  // Returns: the latest message among the heads of the deques, i.e. the minimum
598  // time to end an interval started at getCandidateStart_index()
599  void getCandidateEnd(uint32_t &end_index, ros::Time &end_time)
600  {
601  return getCandidateBoundary(end_index, end_time, true);
602  }
603 
604  // ASSUMES: all deques are non-empty
605  // end = true: look for the latest head of deque
606  // false: look for the earliest head of deque
607  void getCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
608  {
609  namespace mt = ros::message_traits;
610 
611  M0Event& m0 = boost::get<0>(deques_).front();
612  time = mt::TimeStamp<M0>::value(*m0.getMessage());
613  index = 0;
614  if (RealTypeCount::value > 1)
615  {
616  M1Event& m1 = boost::get<1>(deques_).front();
617  if ((mt::TimeStamp<M1>::value(*m1.getMessage()) < time) ^ end)
618  {
619  time = mt::TimeStamp<M1>::value(*m1.getMessage());
620  index = 1;
621  }
622  }
623  if (RealTypeCount::value > 2)
624  {
625  M2Event& m2 = boost::get<2>(deques_).front();
626  if ((mt::TimeStamp<M2>::value(*m2.getMessage()) < time) ^ end)
627  {
628  time = mt::TimeStamp<M2>::value(*m2.getMessage());
629  index = 2;
630  }
631  }
632  if (RealTypeCount::value > 3)
633  {
634  M3Event& m3 = boost::get<3>(deques_).front();
635  if ((mt::TimeStamp<M3>::value(*m3.getMessage()) < time) ^ end)
636  {
637  time = mt::TimeStamp<M3>::value(*m3.getMessage());
638  index = 3;
639  }
640  }
641  if (RealTypeCount::value > 4)
642  {
643  M4Event& m4 = boost::get<4>(deques_).front();
644  if ((mt::TimeStamp<M4>::value(*m4.getMessage()) < time) ^ end)
645  {
646  time = mt::TimeStamp<M4>::value(*m4.getMessage());
647  index = 4;
648  }
649  }
650  if (RealTypeCount::value > 5)
651  {
652  M5Event& m5 = boost::get<5>(deques_).front();
653  if ((mt::TimeStamp<M5>::value(*m5.getMessage()) < time) ^ end)
654  {
655  time = mt::TimeStamp<M5>::value(*m5.getMessage());
656  index = 5;
657  }
658  }
659  if (RealTypeCount::value > 6)
660  {
661  M6Event& m6 = boost::get<6>(deques_).front();
662  if ((mt::TimeStamp<M6>::value(*m6.getMessage()) < time) ^ end)
663  {
664  time = mt::TimeStamp<M6>::value(*m6.getMessage());
665  index = 6;
666  }
667  }
668  if (RealTypeCount::value > 7)
669  {
670  M7Event& m7 = boost::get<7>(deques_).front();
671  if ((mt::TimeStamp<M7>::value(*m7.getMessage()) < time) ^ end)
672  {
673  time = mt::TimeStamp<M7>::value(*m7.getMessage());
674  index = 7;
675  }
676  }
677  if (RealTypeCount::value > 8)
678  {
679  M8Event& m8 = boost::get<8>(deques_).front();
680  if ((mt::TimeStamp<M8>::value(*m8.getMessage()) < time) ^ end)
681  {
682  time = mt::TimeStamp<M8>::value(*m8.getMessage());
683  index = 8;
684  }
685  }
686  }
687 
688 
689  // ASSUMES: we have a pivot and candidate
690  template<int i>
692  {
693  namespace mt = ros::message_traits;
694 
695  if (i >= RealTypeCount::value)
696  {
697  return ros::Time(0,0); // Dummy return value
698  }
700 
701  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
702  std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
703  if (q.empty())
704  {
705  ROS_ASSERT(!v.empty()); // Because we have a candidate
706  ros::Time last_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(*(v.back()).getMessage());
707  ros::Time msg_time_lower_bound = last_msg_time + inter_message_lower_bounds_[i];
708  if (msg_time_lower_bound > pivot_time_) // Take the max
709  {
710  return msg_time_lower_bound;
711  }
712  return pivot_time_;
713  }
714  ros::Time current_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(*(q.front()).getMessage());
715  return current_msg_time;
716  }
717 
718 
719  // ASSUMES: we have a pivot and candidate
720  void getVirtualCandidateStart(uint32_t &start_index, ros::Time &start_time)
721  {
722  return getVirtualCandidateBoundary(start_index, start_time, false);
723  }
724 
725  // ASSUMES: we have a pivot and candidate
726  void getVirtualCandidateEnd(uint32_t &end_index, ros::Time &end_time)
727  {
728  return getVirtualCandidateBoundary(end_index, end_time, true);
729  }
730 
731  // ASSUMES: we have a pivot and candidate
732  // end = true: look for the latest head of deque
733  // false: look for the earliest head of deque
734  void getVirtualCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
735  {
736  namespace mt = ros::message_traits;
737 
738  std::vector<ros::Time> virtual_times(9);
739  virtual_times[0] = getVirtualTime<0>();
740  virtual_times[1] = getVirtualTime<1>();
741  virtual_times[2] = getVirtualTime<2>();
742  virtual_times[3] = getVirtualTime<3>();
743  virtual_times[4] = getVirtualTime<4>();
744  virtual_times[5] = getVirtualTime<5>();
745  virtual_times[6] = getVirtualTime<6>();
746  virtual_times[7] = getVirtualTime<7>();
747  virtual_times[8] = getVirtualTime<8>();
748 
749  time = virtual_times[0];
750  index = 0;
751  for (int i = 0; i < RealTypeCount::value; i++)
752  {
753  if ((virtual_times[i] < time) ^ end)
754  {
755  time = virtual_times[i];
756  index = i;
757  }
758  }
759  }
760 
761 
762  // assumes data_mutex_ is already locked
763  void process()
764  {
765  // While no deque is empty
766  while (num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
767  {
768  // Find the start and end of the current interval
769  //printf("Entering while loop in this state [\n");
770  //show_internal_state();
771  //printf("]\n");
772  ros::Time end_time, start_time;
773  uint32_t end_index, start_index;
774  getCandidateEnd(end_index, end_time);
775  getCandidateStart(start_index, start_time);
776  for (uint32_t i = 0; i < (uint32_t)RealTypeCount::value; i++)
777  {
778  if (i != end_index)
779  {
780  // No dropped message could have been better to use than the ones we have,
781  // so it becomes ok to use this topic as pivot in the future
782  has_dropped_messages_[i] = false;
783  }
784  }
785  if (pivot_ == NO_PIVOT)
786  {
787  // We do not have a candidate
788  // INVARIANT: the past_ vectors are empty
789  // INVARIANT: (candidate_ has no filled members)
790  if (end_time - start_time > max_interval_duration_)
791  {
792  // This interval is too big to be a valid candidate, move to the next
793  dequeDeleteFront(start_index);
794  continue;
795  }
796  if (has_dropped_messages_[end_index])
797  {
798  // The topic that would become pivot has dropped messages, so it is not a good pivot
799  dequeDeleteFront(start_index);
800  continue;
801  }
802  // This is a valid candidate, and we don't have any, so take it
803  makeCandidate();
804  candidate_start_ = start_time;
805  candidate_end_ = end_time;
806  pivot_ = end_index;
807  pivot_time_ = end_time;
808  dequeMoveFrontToPast(start_index);
809  }
810  else
811  {
812  // We already have a candidate
813  // Is this one better than the current candidate?
814  // INVARIANT: has_dropped_messages_ is all false
815  if ((end_time - candidate_end_) * (1 + age_penalty_) >= (start_time - candidate_start_))
816  {
817  // This is not a better candidate, move to the next
818  dequeMoveFrontToPast(start_index);
819  }
820  else
821  {
822  // This is a better candidate
823  makeCandidate();
824  candidate_start_ = start_time;
825  candidate_end_ = end_time;
826  dequeMoveFrontToPast(start_index);
827  // Keep the same pivot (and pivot time)
828  }
829  }
830  // INVARIANT: we have a candidate and pivot
832  //printf("start_index == %d, pivot_ == %d\n", start_index, pivot_);
833  if (start_index == pivot_) // TODO: replace with start_time == pivot_time_
834  {
835  // We have exhausted all possible candidates for this pivot, we now can output the best one
837  }
838  else if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_))
839  {
840  // We have not exhausted all candidates, but this candidate is already provably optimal
841  // Indeed, any future candidate must contain the interval [pivot_time_ end_time], which
842  // is already too big.
843  // Note: this case is subsumed by the next, but it may save some unnecessary work and
844  // it makes things (a little) easier to understand
846  }
847  else if (num_non_empty_deques_ < (uint32_t)RealTypeCount::value)
848  {
849  uint32_t num_non_empty_deques_before_virtual_search = num_non_empty_deques_;
850 
851  // Before giving up, use the rate bounds, if provided, to further try to prove optimality
852  std::vector<int> num_virtual_moves(9,0);
853  while (1)
854  {
855  ros::Time end_time, start_time;
856  uint32_t end_index, start_index;
857  getVirtualCandidateEnd(end_index, end_time);
858  getVirtualCandidateStart(start_index, start_time);
859  if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_))
860  {
861  // We have proved optimality
862  // As above, any future candidate must contain the interval [pivot_time_ end_time], which
863  // is already too big.
864  publishCandidate(); // This cleans up the virtual moves as a byproduct
865  break; // From the while(1) loop only
866  }
867  if ((end_time - candidate_end_) * (1 + age_penalty_) < (start_time - candidate_start_))
868  {
869  // We cannot prove optimality
870  // Indeed, we have a virtual (i.e. optimistic) candidate that is better than the current
871  // candidate
872  // Cleanup the virtual search:
873  num_non_empty_deques_ = 0; // We will recompute it from scratch
874  recover<0>(num_virtual_moves[0]);
875  recover<1>(num_virtual_moves[1]);
876  recover<2>(num_virtual_moves[2]);
877  recover<3>(num_virtual_moves[3]);
878  recover<4>(num_virtual_moves[4]);
879  recover<5>(num_virtual_moves[5]);
880  recover<6>(num_virtual_moves[6]);
881  recover<7>(num_virtual_moves[7]);
882  recover<8>(num_virtual_moves[8]);
883  (void)num_non_empty_deques_before_virtual_search; // unused variable warning stopper
884  ROS_ASSERT(num_non_empty_deques_before_virtual_search == num_non_empty_deques_);
885  break;
886  }
887  // Note: we cannot reach this point with start_index == pivot_ since in that case we would
888  // have start_time == pivot_time, in which case the two tests above are the negation
889  // of each other, so that one must be true. Therefore the while loop always terminates.
890  ROS_ASSERT(start_index != pivot_);
891  ROS_ASSERT(start_time < pivot_time_);
892  dequeMoveFrontToPast(start_index);
893  num_virtual_moves[start_index]++;
894  } // while(1)
895  }
896  } // while(num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
897  }
898 
899  Sync* parent_;
900  uint32_t queue_size_;
901  bool enable_reset_;
902  uint32_t num_reset_deques_;
903 
904  static const uint32_t NO_PIVOT = 9; // Special value for the pivot indicating that no pivot has been selected
905 
907  uint32_t num_non_empty_deques_;
909  Tuple candidate_; // NULL if there is no candidate, in which case there is no pivot.
913  uint32_t pivot_; // Equal to NO_PIVOT if there is no candidate
914  boost::mutex data_mutex_; // Protects all of the above
915 
916  ros::Duration max_interval_duration_; // TODO: initialize with a parameter
917  double age_penalty_;
918 
919  std::vector<bool> has_dropped_messages_;
920  std::vector<ros::Duration> inter_message_lower_bounds_;
921  std::vector<bool> warned_about_incorrect_bound_;
922  std::vector<ros::Time> last_stamps_;
923 };
924 
925 } // namespace sync
926 } // namespace message_filters
927 
928 #endif // MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
929 
message_filters::sync_policies::ApproximateTime::M4Event
Super::M4Event M4Event
Definition: approximate_time.h:180
ROS_BREAK
#define ROS_BREAK()
message_filters::sync_policies::ApproximateTime::M2Event
Super::M2Event M2Event
Definition: approximate_time.h:178
message_filters::sync_policies::ApproximateTime::dequeDeleteFront
void dequeDeleteFront()
Definition: approximate_time.h:438
message_filters::sync_policies::ApproximateTime::getCandidateStart
void getCandidateStart(uint32_t &start_index, ros::Time &start_time)
Definition: approximate_time.h:687
message_filters::sync_policies::ApproximateTime::dequeMoveFrontToPast
void dequeMoveFrontToPast()
Definition: approximate_time.h:488
message_filters::sync_policies::ApproximateTime::Signal
Super::Signal Signal
Definition: approximate_time.h:173
ros::Time::isSimTime
static bool isSimTime()
message_filters::sync_policies::ApproximateTime::max_interval_duration_
ros::Duration max_interval_duration_
Definition: approximate_time.h:1012
message_filters::PolicyBase::M7Event
mpl::at_c< Events, 7 >::type M7Event
Definition: synchronizer.h:419
message_filters::sync_policies::ApproximateTime::M2Vector
std::vector< M2Event > M2Vector
Definition: approximate_time.h:196
message_filters::sync_policies::ApproximateTime::num_reset_deques_
uint32_t num_reset_deques_
Definition: approximate_time.h:998
message_filters::sync_policies::ApproximateTime::getVirtualCandidateStart
void getVirtualCandidateStart(uint32_t &start_index, ros::Time &start_time)
Definition: approximate_time.h:816
message_filters::sync_policies::ApproximateTime::num_non_empty_deques_
uint32_t num_non_empty_deques_
Definition: approximate_time.h:1003
message_filters::sync_policies::ApproximateTime::M8Vector
std::vector< M8Event > M8Vector
Definition: approximate_time.h:202
message_filters::sync_policies::ApproximateTime::candidate_
Tuple candidate_
Definition: approximate_time.h:1005
synchronizer.h
ros
message_filters::sync_policies::ApproximateTime::candidate_start_
ros::Time candidate_start_
Definition: approximate_time.h:1006
message_filters::sync_policies::ApproximateTime::M0Vector
std::vector< M0Event > M0Vector
Definition: approximate_time.h:194
message_filters::PolicyBase::M8Event
mpl::at_c< Events, 8 >::type M8Event
Definition: synchronizer.h:420
message_filters::sync_policies::ApproximateTime::age_penalty_
double age_penalty_
Definition: approximate_time.h:1013
message_filters::sync_policies::ApproximateTime::Tuple
boost::tuple< M0Event, M1Event, M2Event, M3Event, M4Event, M5Event, M6Event, M7Event, M8Event > Tuple
Definition: approximate_time.h:203
message_filters::PolicyBase::M2Event
mpl::at_c< Events, 2 >::type M2Event
Definition: synchronizer.h:414
message_filters::sync_policies::ApproximateTime::getVirtualCandidateEnd
void getVirtualCandidateEnd(uint32_t &end_index, ros::Time &end_time)
Definition: approximate_time.h:822
message_filters::sync_policies::ApproximateTime::M1Vector
std::vector< M1Event > M1Vector
Definition: approximate_time.h:195
message_filters::sync_policies::ApproximateTime::getCandidateEnd
void getCandidateEnd(uint32_t &end_index, ros::Time &end_time)
Definition: approximate_time.h:695
message_filters::sync_policies::ApproximateTime::pivot_
uint32_t pivot_
Definition: approximate_time.h:1009
message_filters::sync_policies::ApproximateTime::getCandidateBoundary
void getCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
Definition: approximate_time.h:703
message_filters::PolicyBase::RealTypeCount
mpl::fold< Messages, mpl::int_< 0 >, mpl::if_< mpl::not_< boost::is_same< mpl::_2, NullType > >, mpl::next< mpl::_1 >, mpl::_1 > >::type RealTypeCount
Definition: synchronizer.h:411
DURATION_MAX
const ROSTIME_DECL Duration DURATION_MAX
message_filters::sync_policies::ApproximateTime::M6Event
Super::M6Event M6Event
Definition: approximate_time.h:182
message_filters::PolicyBase::Events
mpl::vector< ros::MessageEvent< M0 const >, ros::MessageEvent< M1 const >, ros::MessageEvent< M2 const >, ros::MessageEvent< M3 const >, ros::MessageEvent< M4 const >, ros::MessageEvent< M5 const >, ros::MessageEvent< M6 const >, ros::MessageEvent< M7 const >, ros::MessageEvent< M8 const > > Events
Definition: synchronizer.h:410
message_filters::sync_policies::ApproximateTime::M1Deque
std::deque< M1Event > M1Deque
Definition: approximate_time.h:186
message_filters::sync_policies::ApproximateTime::M8Deque
std::deque< M8Event > M8Deque
Definition: approximate_time.h:193
message_filters::sync_policies::ApproximateTime::recoverAndDelete
void recoverAndDelete()
Definition: approximate_time.h:636
message_filters::sync_policies::ApproximateTime::Super
PolicyBase< M0, M1, M2, M3, M4, M5, M6, M7, M8 > Super
Definition: approximate_time.h:171
ros::message_traits
message_filters::PolicyBase::M1Event
mpl::at_c< Events, 1 >::type M1Event
Definition: synchronizer.h:413
message_filters::sync_policies::ApproximateTime::M3Event
Super::M3Event M3Event
Definition: approximate_time.h:179
ROS_WARN_STREAM
#define ROS_WARN_STREAM(args)
message_filters::sync_policies::ApproximateTime::getVirtualCandidateBoundary
void getVirtualCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
Definition: approximate_time.h:830
signal9.h
message_filters::PolicyBase::M3Event
mpl::at_c< Events, 3 >::type M3Event
Definition: synchronizer.h:415
message_filters::sync_policies::ApproximateTime::NO_PIVOT
static const uint32_t NO_PIVOT
Definition: approximate_time.h:1000
message_filters::sync_policies::ApproximateTime::VectorTuple
boost::tuple< M0Vector, M1Vector, M2Vector, M3Vector, M4Vector, M5Vector, M6Vector, M7Vector, M8Vector > VectorTuple
Definition: approximate_time.h:205
message_filters::sync_policies::ApproximateTime::makeCandidate
void makeCandidate()
Definition: approximate_time.h:537
message_filters::sync_policies::ApproximateTime::M6Deque
std::deque< M6Event > M6Deque
Definition: approximate_time.h:191
message_filters::sync_policies::ApproximateTime::ApproximateTime
ApproximateTime(uint32_t queue_size)
Definition: approximate_time.h:207
message_traits.h
message_filters::sync_policies::ApproximateTime::enable_reset_
bool enable_reset_
Definition: approximate_time.h:997
message_filters::sync_policies::ApproximateTime::data_mutex_
boost::mutex data_mutex_
Definition: approximate_time.h:1010
message_filters::sync_policies::ApproximateTime::M0Deque
std::deque< M0Event > M0Deque
Definition: approximate_time.h:185
message_filters::sync_policies::ApproximateTime::deques_
DequeTuple deques_
Definition: approximate_time.h:1002
message_filters::sync_policies::ApproximateTime::add
void add(const typename mpl::at_c< Events, i >::type &evt)
Definition: approximate_time.h:306
ROS_WARN
#define ROS_WARN(...)
message_filters::sync_policies::ApproximateTime::M6Vector
std::vector< M6Event > M6Vector
Definition: approximate_time.h:200
message_filters::sync_policies::ApproximateTime::process
void process()
Definition: approximate_time.h:859
message_filters::sync_policies::ApproximateTime::M1Event
Super::M1Event M1Event
Definition: approximate_time.h:177
message_filters::PolicyBase::M5Event
mpl::at_c< Events, 5 >::type M5Event
Definition: synchronizer.h:417
message_filters::sync_policies::ApproximateTime::M4Vector
std::vector< M4Event > M4Vector
Definition: approximate_time.h:198
message_filters::sync_policies::ApproximateTime::recover
void recover()
Definition: approximate_time.h:613
connection.h
message_filters::sync_policies::ApproximateTime::pivot_time_
ros::Time pivot_time_
Definition: approximate_time.h:1008
message_filters::sync_policies::ApproximateTime::getVirtualTime
ros::Time getVirtualTime()
Definition: approximate_time.h:787
ros::Time
null_types.h
message_filters::sync_policies::ApproximateTime::inter_message_lower_bounds_
std::vector< ros::Duration > inter_message_lower_bounds_
Definition: approximate_time.h:1016
message_filters::PolicyBase::Messages
mpl::vector< M0, M1, M2, M3, M4, M5, M6, M7, M8 > Messages
Definition: synchronizer.h:406
message_filters::sync_policies::ApproximateTime::checkInterMessageBound
bool checkInterMessageBound()
Definition: approximate_time.h:258
message_filters::sync_policies::ApproximateTime::candidate_end_
ros::Time candidate_end_
Definition: approximate_time.h:1007
message_filters::sync_policies::ApproximateTime::M2Deque
std::deque< M2Event > M2Deque
Definition: approximate_time.h:187
message_filters::sync_policies::ApproximateTime::M7Vector
std::vector< M7Event > M7Vector
Definition: approximate_time.h:201
message_filters::PolicyBase::Signal
Signal9< M0, M1, M2, M3, M4, M5, M6, M7, M8 > Signal
Definition: synchronizer.h:407
message_filters::sync_policies::ApproximateTime::Messages
Super::Messages Messages
Definition: approximate_time.h:172
message_filters::sync_policies::ApproximateTime::setReset
void setReset(const bool reset)
Definition: approximate_time.h:405
message_filters::sync_policies::ApproximateTime::queue_size_
uint32_t queue_size_
Definition: approximate_time.h:996
message_filters::sync_policies::ApproximateTime::M5Event
Super::M5Event M5Event
Definition: approximate_time.h:181
message_filters::sync_policies::ApproximateTime::M8Event
Super::M8Event M8Event
Definition: approximate_time.h:184
message_filters::sync_policies::ApproximateTime::operator=
ApproximateTime & operator=(const ApproximateTime &rhs)
Definition: approximate_time.h:229
message_filters::sync_policies::ApproximateTime::parent_
Sync * parent_
Definition: approximate_time.h:995
message_filters::sync_policies::ApproximateTime::publishCandidate
void publishCandidate()
Definition: approximate_time.h:661
message_filters::sync_policies::ApproximateTime::M5Deque
std::deque< M5Event > M5Deque
Definition: approximate_time.h:190
message_filters::sync_policies::ApproximateTime::last_stamps_
std::vector< ros::Time > last_stamps_
Definition: approximate_time.h:1018
message_filters::sync_policies::ApproximateTime::M3Vector
std::vector< M3Event > M3Vector
Definition: approximate_time.h:197
message_filters
Definition: cache.h:47
message_filters::sync_policies::ApproximateTime::M7Event
Super::M7Event M7Event
Definition: approximate_time.h:183
message_filters::PolicyBase::M0Event
mpl::at_c< Events, 0 >::type M0Event
Definition: synchronizer.h:412
message_filters::sync_policies::ApproximateTime::setAgePenalty
void setAgePenalty(double age_penalty)
Definition: approximate_time.h:380
message_filters::sync_policies::ApproximateTime::RealTypeCount
Super::RealTypeCount RealTypeCount
Definition: approximate_time.h:175
assert.h
message_filters::Synchronizer::signal
void signal(const M0Event &e0, const M1Event &e1, const M2Event &e2, const M3Event &e3, const M4Event &e4, const M5Event &e5, const M6Event &e6, const M7Event &e7, const M8Event &e8)
Definition: synchronizer.h:393
message_filters::sync_policies::ApproximateTime::Sync
Synchronizer< ApproximateTime > Sync
Definition: approximate_time.h:170
ROS_ASSERT
#define ROS_ASSERT(cond)
message_filters::sync_policies::ApproximateTime::setMaxIntervalDuration
void setMaxIntervalDuration(ros::Duration max_interval_duration)
Definition: approximate_time.h:400
message_filters::sync_policies::ApproximateTime::M0Event
Super::M0Event M0Event
Definition: approximate_time.h:176
message_filters::sync_policies::ApproximateTime::M4Deque
std::deque< M4Event > M4Deque
Definition: approximate_time.h:189
message_filters::sync_policies::ApproximateTime::setInterMessageLowerBound
void setInterMessageLowerBound(int i, ros::Duration lower_bound)
Definition: approximate_time.h:387
ros::Duration
message_event.h
message_filters::sync_policies::ApproximateTime::M5Vector
std::vector< M5Event > M5Vector
Definition: approximate_time.h:199
message_filters::PolicyBase::M4Event
mpl::at_c< Events, 4 >::type M4Event
Definition: synchronizer.h:416
message_filters::sync_policies::ApproximateTime::M7Deque
std::deque< M7Event > M7Deque
Definition: approximate_time.h:192
message_filters::sync_policies::ApproximateTime::DequeTuple
boost::tuple< M0Deque, M1Deque, M2Deque, M3Deque, M4Deque, M5Deque, M6Deque, M7Deque, M8Deque > DequeTuple
Definition: approximate_time.h:204
message_filters::sync_policies::ApproximateTime::clearDeque
void clearDeque()
Definition: approximate_time.h:413
message_filters::sync_policies::ApproximateTime::Events
Super::Events Events
Definition: approximate_time.h:174
message_filters::PolicyBase::M6Event
mpl::at_c< Events, 6 >::type M6Event
Definition: synchronizer.h:418
message_filters::sync_policies::ApproximateTime::M3Deque
std::deque< M3Event > M3Deque
Definition: approximate_time.h:188
message_filters::sync_policies::ApproximateTime::initParent
void initParent(Sync *parent)
Definition: approximate_time.h:252
message_filters::sync_policies::ApproximateTime::warned_about_incorrect_bound_
std::vector< bool > warned_about_incorrect_bound_
Definition: approximate_time.h:1017
message_filters::sync_policies::ApproximateTime::has_dropped_messages_
std::vector< bool > has_dropped_messages_
Definition: approximate_time.h:1015
message_filters::sync_policies::ApproximateTime::past_
VectorTuple past_
Definition: approximate_time.h:1004


message_filters
Author(s): Josh Faust, Vijay Pradeep, Dirk Thomas , Jacob Perron
autogenerated on Thu Nov 23 2023 04:01:54