approximate_time.h
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2009, Willow Garage, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of the Willow Garage nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 *********************************************************************/
34 
35 #ifndef MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
36 #define MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
37 
42 
43 #include <boost/tuple/tuple.hpp>
44 #include <boost/shared_ptr.hpp>
45 #include <boost/function.hpp>
46 #include <boost/thread/mutex.hpp>
47 
48 #include <boost/bind.hpp>
49 #include <boost/type_traits/is_same.hpp>
50 #include <boost/noncopyable.hpp>
51 #include <boost/mpl/or.hpp>
52 #include <boost/mpl/at.hpp>
53 #include <boost/mpl/vector.hpp>
54 
55 #include <ros/assert.h>
56 #include <ros/message_traits.h>
57 #include <ros/message_event.h>
58 
59 #include <deque>
60 #include <vector>
61 #include <string>
62 
63 namespace message_filters
64 {
65 namespace sync_policies
66 {
67 
68 namespace mpl = boost::mpl;
69 
70 template<typename M0, typename M1, typename M2 = NullType, typename M3 = NullType, typename M4 = NullType,
71  typename M5 = NullType, typename M6 = NullType, typename M7 = NullType, typename M8 = NullType>
72 struct ApproximateTime : public PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8>
73 {
76  typedef typename Super::Messages Messages;
77  typedef typename Super::Signal Signal;
78  typedef typename Super::Events Events;
80  typedef typename Super::M0Event M0Event;
81  typedef typename Super::M1Event M1Event;
82  typedef typename Super::M2Event M2Event;
83  typedef typename Super::M3Event M3Event;
84  typedef typename Super::M4Event M4Event;
85  typedef typename Super::M5Event M5Event;
86  typedef typename Super::M6Event M6Event;
87  typedef typename Super::M7Event M7Event;
88  typedef typename Super::M8Event M8Event;
89  typedef std::deque<M0Event> M0Deque;
90  typedef std::deque<M1Event> M1Deque;
91  typedef std::deque<M2Event> M2Deque;
92  typedef std::deque<M3Event> M3Deque;
93  typedef std::deque<M4Event> M4Deque;
94  typedef std::deque<M5Event> M5Deque;
95  typedef std::deque<M6Event> M6Deque;
96  typedef std::deque<M7Event> M7Deque;
97  typedef std::deque<M8Event> M8Deque;
98  typedef std::vector<M0Event> M0Vector;
99  typedef std::vector<M1Event> M1Vector;
100  typedef std::vector<M2Event> M2Vector;
101  typedef std::vector<M3Event> M3Vector;
102  typedef std::vector<M4Event> M4Vector;
103  typedef std::vector<M5Event> M5Vector;
104  typedef std::vector<M6Event> M6Vector;
105  typedef std::vector<M7Event> M7Vector;
106  typedef std::vector<M8Event> M8Vector;
107  typedef boost::tuple<M0Event, M1Event, M2Event, M3Event, M4Event, M5Event, M6Event, M7Event, M8Event> Tuple;
108  typedef boost::tuple<M0Deque, M1Deque, M2Deque, M3Deque, M4Deque, M5Deque, M6Deque, M7Deque, M8Deque> DequeTuple;
109  typedef boost::tuple<M0Vector, M1Vector, M2Vector, M3Vector, M4Vector, M5Vector, M6Vector, M7Vector, M8Vector> VectorTuple;
110 
111  ApproximateTime(uint32_t queue_size)
112  : parent_(0)
113  , queue_size_(queue_size)
115  , pivot_(NO_PIVOT)
117  , age_penalty_(0.1)
118  , has_dropped_messages_(9, false)
119  , inter_message_lower_bounds_(9, ros::Duration(0))
121  {
122  ROS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
123  }
124 
126  {
127  *this = e;
128  }
129 
131  {
132  parent_ = rhs.parent_;
133  queue_size_ = rhs.queue_size_;
135  pivot_time_ = rhs.pivot_time_;
136  pivot_ = rhs.pivot_;
141  deques_ = rhs.deques_;
142  past_ = rhs.past_;
146 
147  return *this;
148  }
149 
150  void initParent(Sync* parent)
151  {
152  parent_ = parent;
153  }
154 
155  template<int i>
157  {
158  namespace mt = ros::message_traits;
160  {
161  return;
162  }
163  std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
164  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
165  ROS_ASSERT(!deque.empty());
166  const typename mpl::at_c<Messages, i>::type &msg = *(deque.back()).getMessage();
167  ros::Time msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(msg);
168  ros::Time previous_msg_time;
169  if (deque.size() == (size_t) 1)
170  {
171  if (v.empty())
172  {
173  // We have already published (or have never received) the previous message, we cannot check the bound
174  return;
175  }
176  const typename mpl::at_c<Messages, i>::type &previous_msg = *(v.back()).getMessage();
177  previous_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(previous_msg);
178  }
179  else
180  {
181  // There are at least 2 elements in the deque. Check that the gap respects the bound if it was provided.
182  const typename mpl::at_c<Messages, i>::type &previous_msg = *(deque[deque.size()-2]).getMessage();
183  previous_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(previous_msg);
184  }
185  if (msg_time < previous_msg_time)
186  {
187  ROS_WARN_STREAM("Messages of type " << i << " arrived out of order (will print only once)");
189  }
190  else if ((msg_time - previous_msg_time) < inter_message_lower_bounds_[i])
191  {
192  ROS_WARN_STREAM("Messages of type " << i << " arrived closer (" << (msg_time - previous_msg_time)
193  << ") than the lower bound you provided (" << inter_message_lower_bounds_[i]
194  << ") (will print only once)");
196  }
197  }
198 
199 
200  template<int i>
201  void add(const typename mpl::at_c<Events, i>::type& evt)
202  {
203  boost::mutex::scoped_lock lock(data_mutex_);
204 
205  std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
206  deque.push_back(evt);
207  if (deque.size() == (size_t)1) {
208  // We have just added the first message, so it was empty before
210  if (num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
211  {
212  // All deques have messages
213  process();
214  }
215  }
216  else
217  {
218  checkInterMessageBound<i>();
219  }
220  // Check whether we have more messages than allowed in the queue.
221  // Note that during the above call to process(), queue i may contain queue_size_+1 messages.
222  std::vector<typename mpl::at_c<Events, i>::type>& past = boost::get<i>(past_);
223  if (deque.size() + past.size() > queue_size_)
224  {
225  // Cancel ongoing candidate search, if any:
226  num_non_empty_deques_ = 0; // We will recompute it from scratch
227  recover<0>();
228  recover<1>();
229  recover<2>();
230  recover<3>();
231  recover<4>();
232  recover<5>();
233  recover<6>();
234  recover<7>();
235  recover<8>();
236  // Drop the oldest message in the offending topic
237  ROS_ASSERT(!deque.empty());
238  deque.pop_front();
239  has_dropped_messages_[i] = true;
240  if (pivot_ != NO_PIVOT)
241  {
242  // The candidate is no longer valid. Destroy it.
243  candidate_ = Tuple();
244  pivot_ = NO_PIVOT;
245  // There might still be enough messages to create a new candidate:
246  process();
247  }
248  }
249  }
250 
251  void setAgePenalty(double age_penalty)
252  {
253  // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
254  ROS_ASSERT(age_penalty >= 0);
255  age_penalty_ = age_penalty;
256  }
257 
258  void setInterMessageLowerBound(int i, ros::Duration lower_bound) {
259  ROS_ASSERT(lower_bound >= ros::Duration(0,0));
260  inter_message_lower_bounds_[i] = lower_bound;
261  }
262 
264  ROS_ASSERT(lower_bound >= ros::Duration(0,0));
265  for (size_t i = 0; i < inter_message_lower_bounds_.size(); i++)
266  {
267  inter_message_lower_bounds_[i] = lower_bound;
268  }
269  }
270 
271  void setMaxIntervalDuration(ros::Duration max_interval_duration) {
272  ROS_ASSERT(max_interval_duration >= ros::Duration(0,0));
273  max_interval_duration_ = max_interval_duration;
274  }
275 
276 private:
277  // Assumes that deque number <index> is non empty
278  template<int i>
280  {
281  std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
282  ROS_ASSERT(!deque.empty());
283  deque.pop_front();
284  if (deque.empty())
285  {
287  }
288  }
289 
290  // Assumes that deque number <index> is non empty
291  void dequeDeleteFront(uint32_t index)
292  {
293  switch (index)
294  {
295  case 0:
296  dequeDeleteFront<0>();
297  break;
298  case 1:
299  dequeDeleteFront<1>();
300  break;
301  case 2:
302  dequeDeleteFront<2>();
303  break;
304  case 3:
305  dequeDeleteFront<3>();
306  break;
307  case 4:
308  dequeDeleteFront<4>();
309  break;
310  case 5:
311  dequeDeleteFront<5>();
312  break;
313  case 6:
314  dequeDeleteFront<6>();
315  break;
316  case 7:
317  dequeDeleteFront<7>();
318  break;
319  case 8:
320  dequeDeleteFront<8>();
321  break;
322  default:
323  ROS_BREAK();
324  }
325  }
326 
327  // Assumes that deque number <index> is non empty
328  template<int i>
330  {
331  std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
332  std::vector<typename mpl::at_c<Events, i>::type>& vector = boost::get<i>(past_);
333  ROS_ASSERT(!deque.empty());
334  vector.push_back(deque.front());
335  deque.pop_front();
336  if (deque.empty())
337  {
339  }
340  }
341  // Assumes that deque number <index> is non empty
342  void dequeMoveFrontToPast(uint32_t index)
343  {
344  switch (index)
345  {
346  case 0:
347  dequeMoveFrontToPast<0>();
348  break;
349  case 1:
350  dequeMoveFrontToPast<1>();
351  break;
352  case 2:
353  dequeMoveFrontToPast<2>();
354  break;
355  case 3:
356  dequeMoveFrontToPast<3>();
357  break;
358  case 4:
359  dequeMoveFrontToPast<4>();
360  break;
361  case 5:
362  dequeMoveFrontToPast<5>();
363  break;
364  case 6:
365  dequeMoveFrontToPast<6>();
366  break;
367  case 7:
368  dequeMoveFrontToPast<7>();
369  break;
370  case 8:
371  dequeMoveFrontToPast<8>();
372  break;
373  default:
374  ROS_BREAK();
375  }
376  }
377 
379  {
380  //printf("Creating candidate\n");
381  // Create candidate tuple
382  candidate_ = Tuple(); // Discards old one if any
383  boost::get<0>(candidate_) = boost::get<0>(deques_).front();
384  boost::get<1>(candidate_) = boost::get<1>(deques_).front();
385  if (RealTypeCount::value > 2)
386  {
387  boost::get<2>(candidate_) = boost::get<2>(deques_).front();
388  if (RealTypeCount::value > 3)
389  {
390  boost::get<3>(candidate_) = boost::get<3>(deques_).front();
391  if (RealTypeCount::value > 4)
392  {
393  boost::get<4>(candidate_) = boost::get<4>(deques_).front();
394  if (RealTypeCount::value > 5)
395  {
396  boost::get<5>(candidate_) = boost::get<5>(deques_).front();
397  if (RealTypeCount::value > 6)
398  {
399  boost::get<6>(candidate_) = boost::get<6>(deques_).front();
400  if (RealTypeCount::value > 7)
401  {
402  boost::get<7>(candidate_) = boost::get<7>(deques_).front();
403  if (RealTypeCount::value > 8)
404  {
405  boost::get<8>(candidate_) = boost::get<8>(deques_).front();
406  }
407  }
408  }
409  }
410  }
411  }
412  }
413  // Delete all past messages, since we have found a better candidate
414  boost::get<0>(past_).clear();
415  boost::get<1>(past_).clear();
416  boost::get<2>(past_).clear();
417  boost::get<3>(past_).clear();
418  boost::get<4>(past_).clear();
419  boost::get<5>(past_).clear();
420  boost::get<6>(past_).clear();
421  boost::get<7>(past_).clear();
422  boost::get<8>(past_).clear();
423  //printf("Candidate created\n");
424  }
425 
426 
427  // ASSUMES: num_messages <= past_[i].size()
428  template<int i>
429  void recover(size_t num_messages)
430  {
431  if (i >= RealTypeCount::value)
432  {
433  return;
434  }
435 
436  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
437  std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
438  ROS_ASSERT(num_messages <= v.size());
439  while (num_messages > 0)
440  {
441  q.push_front(v.back());
442  v.pop_back();
443  num_messages--;
444  }
445 
446  if (!q.empty())
447  {
449  }
450  }
451 
452 
453  template<int i>
454  void recover()
455  {
456  if (i >= RealTypeCount::value)
457  {
458  return;
459  }
460 
461  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
462  std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
463  while (!v.empty())
464  {
465  q.push_front(v.back());
466  v.pop_back();
467  }
468 
469  if (!q.empty())
470  {
472  }
473  }
474 
475 
476  template<int i>
478  {
479  if (i >= RealTypeCount::value)
480  {
481  return;
482  }
483 
484  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
485  std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
486  while (!v.empty())
487  {
488  q.push_front(v.back());
489  v.pop_back();
490  }
491 
492  ROS_ASSERT(!q.empty());
493 
494  q.pop_front();
495  if (!q.empty())
496  {
498  }
499  }
500 
501  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
503  {
504  //printf("Publishing candidate\n");
505  // Publish
506  parent_->signal(boost::get<0>(candidate_), boost::get<1>(candidate_), boost::get<2>(candidate_), boost::get<3>(candidate_),
507  boost::get<4>(candidate_), boost::get<5>(candidate_), boost::get<6>(candidate_), boost::get<7>(candidate_),
508  boost::get<8>(candidate_));
509  // Delete this candidate
510  candidate_ = Tuple();
511  pivot_ = NO_PIVOT;
512 
513  // Recover hidden messages, and delete the ones corresponding to the candidate
514  num_non_empty_deques_ = 0; // We will recompute it from scratch
515  recoverAndDelete<0>();
516  recoverAndDelete<1>();
517  recoverAndDelete<2>();
518  recoverAndDelete<3>();
519  recoverAndDelete<4>();
520  recoverAndDelete<5>();
521  recoverAndDelete<6>();
522  recoverAndDelete<7>();
523  recoverAndDelete<8>();
524  }
525 
526  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
527  // Returns: the oldest message on the deques
528  void getCandidateStart(uint32_t &start_index, ros::Time &start_time)
529  {
530  return getCandidateBoundary(start_index, start_time, false);
531  }
532 
533  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
534  // Returns: the latest message among the heads of the deques, i.e. the minimum
535  // time to end an interval started at getCandidateStart_index()
536  void getCandidateEnd(uint32_t &end_index, ros::Time &end_time)
537  {
538  return getCandidateBoundary(end_index, end_time, true);
539  }
540 
541  // ASSUMES: all deques are non-empty
542  // end = true: look for the latest head of deque
543  // false: look for the earliest head of deque
544  void getCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
545  {
546  namespace mt = ros::message_traits;
547 
548  M0Event& m0 = boost::get<0>(deques_).front();
549  time = mt::TimeStamp<M0>::value(*m0.getMessage());
550  index = 0;
551  if (RealTypeCount::value > 1)
552  {
553  M1Event& m1 = boost::get<1>(deques_).front();
554  if ((mt::TimeStamp<M1>::value(*m1.getMessage()) < time) ^ end)
555  {
556  time = mt::TimeStamp<M1>::value(*m1.getMessage());
557  index = 1;
558  }
559  }
560  if (RealTypeCount::value > 2)
561  {
562  M2Event& m2 = boost::get<2>(deques_).front();
563  if ((mt::TimeStamp<M2>::value(*m2.getMessage()) < time) ^ end)
564  {
565  time = mt::TimeStamp<M2>::value(*m2.getMessage());
566  index = 2;
567  }
568  }
569  if (RealTypeCount::value > 3)
570  {
571  M3Event& m3 = boost::get<3>(deques_).front();
572  if ((mt::TimeStamp<M3>::value(*m3.getMessage()) < time) ^ end)
573  {
574  time = mt::TimeStamp<M3>::value(*m3.getMessage());
575  index = 3;
576  }
577  }
578  if (RealTypeCount::value > 4)
579  {
580  M4Event& m4 = boost::get<4>(deques_).front();
581  if ((mt::TimeStamp<M4>::value(*m4.getMessage()) < time) ^ end)
582  {
583  time = mt::TimeStamp<M4>::value(*m4.getMessage());
584  index = 4;
585  }
586  }
587  if (RealTypeCount::value > 5)
588  {
589  M5Event& m5 = boost::get<5>(deques_).front();
590  if ((mt::TimeStamp<M5>::value(*m5.getMessage()) < time) ^ end)
591  {
592  time = mt::TimeStamp<M5>::value(*m5.getMessage());
593  index = 5;
594  }
595  }
596  if (RealTypeCount::value > 6)
597  {
598  M6Event& m6 = boost::get<6>(deques_).front();
599  if ((mt::TimeStamp<M6>::value(*m6.getMessage()) < time) ^ end)
600  {
601  time = mt::TimeStamp<M6>::value(*m6.getMessage());
602  index = 6;
603  }
604  }
605  if (RealTypeCount::value > 7)
606  {
607  M7Event& m7 = boost::get<7>(deques_).front();
608  if ((mt::TimeStamp<M7>::value(*m7.getMessage()) < time) ^ end)
609  {
610  time = mt::TimeStamp<M7>::value(*m7.getMessage());
611  index = 7;
612  }
613  }
614  if (RealTypeCount::value > 8)
615  {
616  M8Event& m8 = boost::get<8>(deques_).front();
617  if ((mt::TimeStamp<M8>::value(*m8.getMessage()) < time) ^ end)
618  {
619  time = mt::TimeStamp<M8>::value(*m8.getMessage());
620  index = 8;
621  }
622  }
623  }
624 
625 
626  // ASSUMES: we have a pivot and candidate
627  template<int i>
629  {
630  namespace mt = ros::message_traits;
631 
632  if (i >= RealTypeCount::value)
633  {
634  return ros::Time(0,0); // Dummy return value
635  }
637 
638  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
639  std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
640  if (q.empty())
641  {
642  ROS_ASSERT(!v.empty()); // Because we have a candidate
643  ros::Time last_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(*(v.back()).getMessage());
644  ros::Time msg_time_lower_bound = last_msg_time + inter_message_lower_bounds_[i];
645  if (msg_time_lower_bound > pivot_time_) // Take the max
646  {
647  return msg_time_lower_bound;
648  }
649  return pivot_time_;
650  }
651  ros::Time current_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(*(q.front()).getMessage());
652  return current_msg_time;
653  }
654 
655 
656  // ASSUMES: we have a pivot and candidate
657  void getVirtualCandidateStart(uint32_t &start_index, ros::Time &start_time)
658  {
659  return getVirtualCandidateBoundary(start_index, start_time, false);
660  }
661 
662  // ASSUMES: we have a pivot and candidate
663  void getVirtualCandidateEnd(uint32_t &end_index, ros::Time &end_time)
664  {
665  return getVirtualCandidateBoundary(end_index, end_time, true);
666  }
667 
668  // ASSUMES: we have a pivot and candidate
669  // end = true: look for the latest head of deque
670  // false: look for the earliest head of deque
671  void getVirtualCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
672  {
673  namespace mt = ros::message_traits;
674 
675  std::vector<ros::Time> virtual_times(9);
676  virtual_times[0] = getVirtualTime<0>();
677  virtual_times[1] = getVirtualTime<1>();
678  virtual_times[2] = getVirtualTime<2>();
679  virtual_times[3] = getVirtualTime<3>();
680  virtual_times[4] = getVirtualTime<4>();
681  virtual_times[5] = getVirtualTime<5>();
682  virtual_times[6] = getVirtualTime<6>();
683  virtual_times[7] = getVirtualTime<7>();
684  virtual_times[8] = getVirtualTime<8>();
685 
686  time = virtual_times[0];
687  index = 0;
688  for (int i = 0; i < RealTypeCount::value; i++)
689  {
690  if ((virtual_times[i] < time) ^ end)
691  {
692  time = virtual_times[i];
693  index = i;
694  }
695  }
696  }
697 
698 
699  // assumes data_mutex_ is already locked
700  void process()
701  {
702  // While no deque is empty
703  while (num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
704  {
705  // Find the start and end of the current interval
706  //printf("Entering while loop in this state [\n");
707  //show_internal_state();
708  //printf("]\n");
709  ros::Time end_time, start_time;
710  uint32_t end_index, start_index;
711  getCandidateEnd(end_index, end_time);
712  getCandidateStart(start_index, start_time);
713  for (uint32_t i = 0; i < (uint32_t)RealTypeCount::value; i++)
714  {
715  if (i != end_index)
716  {
717  // No dropped message could have been better to use than the ones we have,
718  // so it becomes ok to use this topic as pivot in the future
719  has_dropped_messages_[i] = false;
720  }
721  }
722  if (pivot_ == NO_PIVOT)
723  {
724  // We do not have a candidate
725  // INVARIANT: the past_ vectors are empty
726  // INVARIANT: (candidate_ has no filled members)
727  if (end_time - start_time > max_interval_duration_)
728  {
729  // This interval is too big to be a valid candidate, move to the next
730  dequeDeleteFront(start_index);
731  continue;
732  }
733  if (has_dropped_messages_[end_index])
734  {
735  // The topic that would become pivot has dropped messages, so it is not a good pivot
736  dequeDeleteFront(start_index);
737  continue;
738  }
739  // This is a valid candidate, and we don't have any, so take it
740  makeCandidate();
741  candidate_start_ = start_time;
742  candidate_end_ = end_time;
743  pivot_ = end_index;
744  pivot_time_ = end_time;
745  dequeMoveFrontToPast(start_index);
746  }
747  else
748  {
749  // We already have a candidate
750  // Is this one better than the current candidate?
751  // INVARIANT: has_dropped_messages_ is all false
752  if ((end_time - candidate_end_) * (1 + age_penalty_) >= (start_time - candidate_start_))
753  {
754  // This is not a better candidate, move to the next
755  dequeMoveFrontToPast(start_index);
756  }
757  else
758  {
759  // This is a better candidate
760  makeCandidate();
761  candidate_start_ = start_time;
762  candidate_end_ = end_time;
763  dequeMoveFrontToPast(start_index);
764  // Keep the same pivot (and pivot time)
765  }
766  }
767  // INVARIANT: we have a candidate and pivot
769  //printf("start_index == %d, pivot_ == %d\n", start_index, pivot_);
770  if (start_index == pivot_) // TODO: replace with start_time == pivot_time_
771  {
772  // We have exhausted all possible candidates for this pivot, we now can output the best one
774  }
775  else if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_))
776  {
777  // We have not exhausted all candidates, but this candidate is already provably optimal
778  // Indeed, any future candidate must contain the interval [pivot_time_ end_time], which
779  // is already too big.
780  // Note: this case is subsumed by the next, but it may save some unnecessary work and
781  // it makes things (a little) easier to understand
783  }
784  else if (num_non_empty_deques_ < (uint32_t)RealTypeCount::value)
785  {
786  uint32_t num_non_empty_deques_before_virtual_search = num_non_empty_deques_;
787 
788  // Before giving up, use the rate bounds, if provided, to further try to prove optimality
789  std::vector<int> num_virtual_moves(9,0);
790  while (1)
791  {
792  ros::Time end_time, start_time;
793  uint32_t end_index, start_index;
794  getVirtualCandidateEnd(end_index, end_time);
795  getVirtualCandidateStart(start_index, start_time);
796  if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_))
797  {
798  // We have proved optimality
799  // As above, any future candidate must contain the interval [pivot_time_ end_time], which
800  // is already too big.
801  publishCandidate(); // This cleans up the virtual moves as a byproduct
802  break; // From the while(1) loop only
803  }
804  if ((end_time - candidate_end_) * (1 + age_penalty_) < (start_time - candidate_start_))
805  {
806  // We cannot prove optimality
807  // Indeed, we have a virtual (i.e. optimistic) candidate that is better than the current
808  // candidate
809  // Cleanup the virtual search:
810  num_non_empty_deques_ = 0; // We will recompute it from scratch
811  recover<0>(num_virtual_moves[0]);
812  recover<1>(num_virtual_moves[1]);
813  recover<2>(num_virtual_moves[2]);
814  recover<3>(num_virtual_moves[3]);
815  recover<4>(num_virtual_moves[4]);
816  recover<5>(num_virtual_moves[5]);
817  recover<6>(num_virtual_moves[6]);
818  recover<7>(num_virtual_moves[7]);
819  recover<8>(num_virtual_moves[8]);
820  (void)num_non_empty_deques_before_virtual_search; // unused variable warning stopper
821  ROS_ASSERT(num_non_empty_deques_before_virtual_search == num_non_empty_deques_);
822  break;
823  }
824  // Note: we cannot reach this point with start_index == pivot_ since in that case we would
825  // have start_time == pivot_time, in which case the two tests above are the negation
826  // of each other, so that one must be true. Therefore the while loop always terminates.
827  ROS_ASSERT(start_index != pivot_);
828  ROS_ASSERT(start_time < pivot_time_);
829  dequeMoveFrontToPast(start_index);
830  num_virtual_moves[start_index]++;
831  } // while(1)
832  }
833  } // while(num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
834  }
835 
836  Sync* parent_;
837  uint32_t queue_size_;
838 
839  static const uint32_t NO_PIVOT = 9; // Special value for the pivot indicating that no pivot has been selected
840 
841  DequeTuple deques_;
843  VectorTuple past_;
844  Tuple candidate_; // NULL if there is no candidate, in which case there is no pivot.
848  uint32_t pivot_; // Equal to NO_PIVOT if there is no candidate
849  boost::mutex data_mutex_; // Protects all of the above
850 
851  ros::Duration max_interval_duration_; // TODO: initialize with a parameter
852  double age_penalty_;
853 
854  std::vector<bool> has_dropped_messages_;
855  std::vector<ros::Duration> inter_message_lower_bounds_;
856  std::vector<bool> warned_about_incorrect_bound_;
857 };
858 
859 } // namespace sync
860 } // namespace message_filters
861 
862 #endif // MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
863 
void setInterMessageLowerBound(int i, ros::Duration lower_bound)
boost::tuple< M0Event, M1Event, M2Event, M3Event, M4Event, M5Event, M6Event, M7Event, M8Event > Tuple
mpl::vector< ros::MessageEvent< M0 const >, ros::MessageEvent< M1 const >, ros::MessageEvent< M2 const >, ros::MessageEvent< M3 const >, ros::MessageEvent< M4 const >, ros::MessageEvent< M5 const >, ros::MessageEvent< M6 const >, ros::MessageEvent< M7 const >, ros::MessageEvent< M8 const > > Events
Definition: synchronizer.h:378
void getCandidateEnd(uint32_t &end_index, ros::Time &end_time)
std::vector< ros::Duration > inter_message_lower_bounds_
void signal(const M0Event &e0, const M1Event &e1, const M2Event &e2, const M3Event &e3, const M4Event &e4, const M5Event &e5, const M6Event &e6, const M7Event &e7, const M8Event &e8)
Definition: synchronizer.h:329
void getVirtualCandidateStart(uint32_t &start_index, ros::Time &start_time)
mpl::at_c< Events, 1 >::type M1Event
Definition: synchronizer.h:381
mpl::fold< Messages, mpl::int_< 0 >, mpl::if_< mpl::not_< boost::is_same< mpl::_2, NullType > >, mpl::next< mpl::_1 >, mpl::_1 > >::type RealTypeCount
Definition: synchronizer.h:379
void getCandidateStart(uint32_t &start_index, ros::Time &start_time)
mpl::at_c< Events, 5 >::type M5Event
Definition: synchronizer.h:385
mpl::at_c< Events, 2 >::type M2Event
Definition: synchronizer.h:382
void setInterMessageLowerBound(ros::Duration lower_bound)
mpl::vector< M0, M1, M2, M3, M4, M5, M6, M7, M8 > Messages
Definition: synchronizer.h:374
void setMaxIntervalDuration(ros::Duration max_interval_duration)
void getVirtualCandidateEnd(uint32_t &end_index, ros::Time &end_time)
mpl::at_c< Events, 0 >::type M0Event
Definition: synchronizer.h:380
#define ROS_WARN_STREAM(args)
mpl::at_c< Events, 3 >::type M3Event
Definition: synchronizer.h:383
boost::tuple< M0Deque, M1Deque, M2Deque, M3Deque, M4Deque, M5Deque, M6Deque, M7Deque, M8Deque > DequeTuple
void add(const typename mpl::at_c< Events, i >::type &evt)
void getCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
PolicyBase< M0, M1, M2, M3, M4, M5, M6, M7, M8 > Super
#define ROS_ASSERT(cond)
mpl::at_c< Events, 7 >::type M7Event
Definition: synchronizer.h:387
mpl::at_c< Events, 4 >::type M4Event
Definition: synchronizer.h:384
#define ROS_BREAK()
ROSTIME_DECL const Duration DURATION_MAX
void getVirtualCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
boost::tuple< M0Vector, M1Vector, M2Vector, M3Vector, M4Vector, M5Vector, M6Vector, M7Vector, M8Vector > VectorTuple
ApproximateTime & operator=(const ApproximateTime &rhs)
mpl::at_c< Events, 8 >::type M8Event
Definition: synchronizer.h:388
mpl::at_c< Events, 6 >::type M6Event
Definition: synchronizer.h:386


message_filters
Author(s): Josh Faust, Vijay Pradeep, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:48