approximate_time.h
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2009, Willow Garage, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of the Willow Garage nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 *********************************************************************/
34 
35 #ifndef MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
36 #define MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
37 
42 
43 #include <boost/tuple/tuple.hpp>
44 #include <boost/shared_ptr.hpp>
45 #include <boost/function.hpp>
46 #include <boost/thread/mutex.hpp>
47 
48 #include <boost/bind.hpp>
49 #include <boost/type_traits/is_same.hpp>
50 #include <boost/noncopyable.hpp>
51 #include <boost/mpl/or.hpp>
52 #include <boost/mpl/at.hpp>
53 #include <boost/mpl/vector.hpp>
54 
55 #include <ros/assert.h>
56 #include <ros/message_traits.h>
57 #include <ros/message_event.h>
58 
59 #include <deque>
60 #include <vector>
61 #include <string>
62 
63 namespace message_filters
64 {
65 namespace sync_policies
66 {
67 
68 namespace mpl = boost::mpl;
69 
70 template<typename M0, typename M1, typename M2 = NullType, typename M3 = NullType, typename M4 = NullType,
71  typename M5 = NullType, typename M6 = NullType, typename M7 = NullType, typename M8 = NullType>
72 struct ApproximateTime : public PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8>
73 {
76  typedef typename Super::Messages Messages;
77  typedef typename Super::Signal Signal;
78  typedef typename Super::Events Events;
80  typedef typename Super::M0Event M0Event;
81  typedef typename Super::M1Event M1Event;
82  typedef typename Super::M2Event M2Event;
83  typedef typename Super::M3Event M3Event;
84  typedef typename Super::M4Event M4Event;
85  typedef typename Super::M5Event M5Event;
86  typedef typename Super::M6Event M6Event;
87  typedef typename Super::M7Event M7Event;
88  typedef typename Super::M8Event M8Event;
89  typedef std::deque<M0Event> M0Deque;
90  typedef std::deque<M1Event> M1Deque;
91  typedef std::deque<M2Event> M2Deque;
92  typedef std::deque<M3Event> M3Deque;
93  typedef std::deque<M4Event> M4Deque;
94  typedef std::deque<M5Event> M5Deque;
95  typedef std::deque<M6Event> M6Deque;
96  typedef std::deque<M7Event> M7Deque;
97  typedef std::deque<M8Event> M8Deque;
98  typedef std::vector<M0Event> M0Vector;
99  typedef std::vector<M1Event> M1Vector;
100  typedef std::vector<M2Event> M2Vector;
101  typedef std::vector<M3Event> M3Vector;
102  typedef std::vector<M4Event> M4Vector;
103  typedef std::vector<M5Event> M5Vector;
104  typedef std::vector<M6Event> M6Vector;
105  typedef std::vector<M7Event> M7Vector;
106  typedef std::vector<M8Event> M8Vector;
107  typedef boost::tuple<M0Event, M1Event, M2Event, M3Event, M4Event, M5Event, M6Event, M7Event, M8Event> Tuple;
108  typedef boost::tuple<M0Deque, M1Deque, M2Deque, M3Deque, M4Deque, M5Deque, M6Deque, M7Deque, M8Deque> DequeTuple;
109  typedef boost::tuple<M0Vector, M1Vector, M2Vector, M3Vector, M4Vector, M5Vector, M6Vector, M7Vector, M8Vector> VectorTuple;
110 
111  ApproximateTime(uint32_t queue_size)
112  : parent_(0)
113  , queue_size_(queue_size)
115  , pivot_(NO_PIVOT)
116  , max_interval_duration_(ros::DURATION_MAX)
117  , age_penalty_(0.1)
118  , has_dropped_messages_(9, false)
119  , inter_message_lower_bounds_(9, ros::Duration(0))
121  {
122  ROS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
123  }
124 
126  {
127  *this = e;
128  }
129 
131  {
132  parent_ = rhs.parent_;
133  queue_size_ = rhs.queue_size_;
135  pivot_time_ = rhs.pivot_time_;
136  pivot_ = rhs.pivot_;
141  deques_ = rhs.deques_;
142  past_ = rhs.past_;
146 
147  return *this;
148  }
149 
150  void initParent(Sync* parent)
151  {
152  parent_ = parent;
153  }
154 
155  template<int i>
157  {
158  namespace mt = ros::message_traits;
160  {
161  return;
162  }
163  std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
164  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
165  ROS_ASSERT(!deque.empty());
166  const typename mpl::at_c<Messages, i>::type &msg = *(deque.back()).getMessage();
167  ros::Time msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(msg);
168  ros::Time previous_msg_time;
169  if (deque.size() == (size_t) 1)
170  {
171  if (v.empty())
172  {
173  // We have already published (or have never received) the previous message, we cannot check the bound
174  return;
175  }
176  const typename mpl::at_c<Messages, i>::type &previous_msg = *(v.back()).getMessage();
177  previous_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(previous_msg);
178  }
179  else
180  {
181  // There are at least 2 elements in the deque. Check that the gap respects the bound if it was provided.
182  const typename mpl::at_c<Messages, i>::type &previous_msg = *(deque[deque.size()-2]).getMessage();
183  previous_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(previous_msg);
184  }
185  if (msg_time < previous_msg_time)
186  {
187  ROS_WARN_STREAM("Messages of type " << i << " arrived out of order (will print only once)");
189  }
190  else if ((msg_time - previous_msg_time) < inter_message_lower_bounds_[i])
191  {
192  ROS_WARN_STREAM("Messages of type " << i << " arrived closer (" << (msg_time - previous_msg_time)
193  << ") than the lower bound you provided (" << inter_message_lower_bounds_[i]
194  << ") (will print only once)");
196  }
197  }
198 
199 
200  template<int i>
201  void add(const typename mpl::at_c<Events, i>::type& evt)
202  {
203  boost::mutex::scoped_lock lock(data_mutex_);
204 
205  std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
206  deque.push_back(evt);
207  if (deque.size() == (size_t)1) {
208  // We have just added the first message, so it was empty before
210  if (num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
211  {
212  // All deques have messages
213  process();
214  }
215  }
216  else
217  {
218  checkInterMessageBound<i>();
219  }
220  // Check whether we have more messages than allowed in the queue.
221  // Note that during the above call to process(), queue i may contain queue_size_+1 messages.
222  std::vector<typename mpl::at_c<Events, i>::type>& past = boost::get<i>(past_);
223  if (deque.size() + past.size() > queue_size_)
224  {
225  // Cancel ongoing candidate search, if any:
226  num_non_empty_deques_ = 0; // We will recompute it from scratch
227  recover<0>();
228  recover<1>();
229  recover<2>();
230  recover<3>();
231  recover<4>();
232  recover<5>();
233  recover<6>();
234  recover<7>();
235  recover<8>();
236  // Drop the oldest message in the offending topic
237  ROS_ASSERT(!deque.empty());
238  deque.pop_front();
239  has_dropped_messages_[i] = true;
240  if (pivot_ != NO_PIVOT)
241  {
242  // The candidate is no longer valid. Destroy it.
243  candidate_ = Tuple();
244  pivot_ = NO_PIVOT;
245  // There might still be enough messages to create a new candidate:
246  process();
247  }
248  }
249  }
250 
251  void setAgePenalty(double age_penalty)
252  {
253  // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
254  ROS_ASSERT(age_penalty >= 0);
255  age_penalty_ = age_penalty;
256  }
257 
258  void setInterMessageLowerBound(int i, ros::Duration lower_bound) {
259  // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
260  ROS_ASSERT(lower_bound >= ros::Duration(0,0));
261  inter_message_lower_bounds_[i] = lower_bound;
262  }
263 
264  void setMaxIntervalDuration(ros::Duration max_interval_duration) {
265  // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
266  ROS_ASSERT(max_interval_duration >= ros::Duration(0,0));
267  max_interval_duration_ = max_interval_duration;
268  }
269 
270 private:
271  // Assumes that deque number <index> is non empty
272  template<int i>
274  {
275  std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
276  ROS_ASSERT(!deque.empty());
277  deque.pop_front();
278  if (deque.empty())
279  {
281  }
282  }
283 
284  // Assumes that deque number <index> is non empty
285  void dequeDeleteFront(uint32_t index)
286  {
287  switch (index)
288  {
289  case 0:
290  dequeDeleteFront<0>();
291  break;
292  case 1:
293  dequeDeleteFront<1>();
294  break;
295  case 2:
296  dequeDeleteFront<2>();
297  break;
298  case 3:
299  dequeDeleteFront<3>();
300  break;
301  case 4:
302  dequeDeleteFront<4>();
303  break;
304  case 5:
305  dequeDeleteFront<5>();
306  break;
307  case 6:
308  dequeDeleteFront<6>();
309  break;
310  case 7:
311  dequeDeleteFront<7>();
312  break;
313  case 8:
314  dequeDeleteFront<8>();
315  break;
316  default:
317  ROS_BREAK();
318  }
319  }
320 
321  // Assumes that deque number <index> is non empty
322  template<int i>
324  {
325  std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
326  std::vector<typename mpl::at_c<Events, i>::type>& vector = boost::get<i>(past_);
327  ROS_ASSERT(!deque.empty());
328  vector.push_back(deque.front());
329  deque.pop_front();
330  if (deque.empty())
331  {
333  }
334  }
335  // Assumes that deque number <index> is non empty
336  void dequeMoveFrontToPast(uint32_t index)
337  {
338  switch (index)
339  {
340  case 0:
341  dequeMoveFrontToPast<0>();
342  break;
343  case 1:
344  dequeMoveFrontToPast<1>();
345  break;
346  case 2:
347  dequeMoveFrontToPast<2>();
348  break;
349  case 3:
350  dequeMoveFrontToPast<3>();
351  break;
352  case 4:
353  dequeMoveFrontToPast<4>();
354  break;
355  case 5:
356  dequeMoveFrontToPast<5>();
357  break;
358  case 6:
359  dequeMoveFrontToPast<6>();
360  break;
361  case 7:
362  dequeMoveFrontToPast<7>();
363  break;
364  case 8:
365  dequeMoveFrontToPast<8>();
366  break;
367  default:
368  ROS_BREAK();
369  }
370  }
371 
373  {
374  //printf("Creating candidate\n");
375  // Create candidate tuple
376  candidate_ = Tuple(); // Discards old one if any
377  boost::get<0>(candidate_) = boost::get<0>(deques_).front();
378  boost::get<1>(candidate_) = boost::get<1>(deques_).front();
379  if (RealTypeCount::value > 2)
380  {
381  boost::get<2>(candidate_) = boost::get<2>(deques_).front();
382  if (RealTypeCount::value > 3)
383  {
384  boost::get<3>(candidate_) = boost::get<3>(deques_).front();
385  if (RealTypeCount::value > 4)
386  {
387  boost::get<4>(candidate_) = boost::get<4>(deques_).front();
388  if (RealTypeCount::value > 5)
389  {
390  boost::get<5>(candidate_) = boost::get<5>(deques_).front();
391  if (RealTypeCount::value > 6)
392  {
393  boost::get<6>(candidate_) = boost::get<6>(deques_).front();
394  if (RealTypeCount::value > 7)
395  {
396  boost::get<7>(candidate_) = boost::get<7>(deques_).front();
397  if (RealTypeCount::value > 8)
398  {
399  boost::get<8>(candidate_) = boost::get<8>(deques_).front();
400  }
401  }
402  }
403  }
404  }
405  }
406  }
407  // Delete all past messages, since we have found a better candidate
408  boost::get<0>(past_).clear();
409  boost::get<1>(past_).clear();
410  boost::get<2>(past_).clear();
411  boost::get<3>(past_).clear();
412  boost::get<4>(past_).clear();
413  boost::get<5>(past_).clear();
414  boost::get<6>(past_).clear();
415  boost::get<7>(past_).clear();
416  boost::get<8>(past_).clear();
417  //printf("Candidate created\n");
418  }
419 
420 
421  // ASSUMES: num_messages <= past_[i].size()
422  template<int i>
423  void recover(size_t num_messages)
424  {
425  if (i >= RealTypeCount::value)
426  {
427  return;
428  }
429 
430  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
431  std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
432  ROS_ASSERT(num_messages <= v.size());
433  while (num_messages > 0)
434  {
435  q.push_front(v.back());
436  v.pop_back();
437  num_messages--;
438  }
439 
440  if (!q.empty())
441  {
443  }
444  }
445 
446 
447  template<int i>
448  void recover()
449  {
450  if (i >= RealTypeCount::value)
451  {
452  return;
453  }
454 
455  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
456  std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
457  while (!v.empty())
458  {
459  q.push_front(v.back());
460  v.pop_back();
461  }
462 
463  if (!q.empty())
464  {
466  }
467  }
468 
469 
470  template<int i>
472  {
473  if (i >= RealTypeCount::value)
474  {
475  return;
476  }
477 
478  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
479  std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
480  while (!v.empty())
481  {
482  q.push_front(v.back());
483  v.pop_back();
484  }
485 
486  ROS_ASSERT(!q.empty());
487 
488  q.pop_front();
489  if (!q.empty())
490  {
492  }
493  }
494 
495  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
497  {
498  //printf("Publishing candidate\n");
499  // Publish
500  parent_->signal(boost::get<0>(candidate_), boost::get<1>(candidate_), boost::get<2>(candidate_), boost::get<3>(candidate_),
501  boost::get<4>(candidate_), boost::get<5>(candidate_), boost::get<6>(candidate_), boost::get<7>(candidate_),
502  boost::get<8>(candidate_));
503  // Delete this candidate
504  candidate_ = Tuple();
505  pivot_ = NO_PIVOT;
506 
507  // Recover hidden messages, and delete the ones corresponding to the candidate
508  num_non_empty_deques_ = 0; // We will recompute it from scratch
509  recoverAndDelete<0>();
510  recoverAndDelete<1>();
511  recoverAndDelete<2>();
512  recoverAndDelete<3>();
513  recoverAndDelete<4>();
514  recoverAndDelete<5>();
515  recoverAndDelete<6>();
516  recoverAndDelete<7>();
517  recoverAndDelete<8>();
518  }
519 
520  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
521  // Returns: the oldest message on the deques
522  void getCandidateStart(uint32_t &start_index, ros::Time &start_time)
523  {
524  return getCandidateBoundary(start_index, start_time, false);
525  }
526 
527  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
528  // Returns: the latest message among the heads of the deques, i.e. the minimum
529  // time to end an interval started at getCandidateStart_index()
530  void getCandidateEnd(uint32_t &end_index, ros::Time &end_time)
531  {
532  return getCandidateBoundary(end_index, end_time, true);
533  }
534 
535  // ASSUMES: all deques are non-empty
536  // end = true: look for the latest head of deque
537  // false: look for the earliest head of deque
538  void getCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
539  {
540  namespace mt = ros::message_traits;
541 
542  M0Event& m0 = boost::get<0>(deques_).front();
543  time = mt::TimeStamp<M0>::value(*m0.getMessage());
544  index = 0;
545  if (RealTypeCount::value > 1)
546  {
547  M1Event& m1 = boost::get<1>(deques_).front();
548  if ((mt::TimeStamp<M1>::value(*m1.getMessage()) < time) ^ end)
549  {
550  time = mt::TimeStamp<M1>::value(*m1.getMessage());
551  index = 1;
552  }
553  }
554  if (RealTypeCount::value > 2)
555  {
556  M2Event& m2 = boost::get<2>(deques_).front();
557  if ((mt::TimeStamp<M2>::value(*m2.getMessage()) < time) ^ end)
558  {
559  time = mt::TimeStamp<M2>::value(*m2.getMessage());
560  index = 2;
561  }
562  }
563  if (RealTypeCount::value > 3)
564  {
565  M3Event& m3 = boost::get<3>(deques_).front();
566  if ((mt::TimeStamp<M3>::value(*m3.getMessage()) < time) ^ end)
567  {
568  time = mt::TimeStamp<M3>::value(*m3.getMessage());
569  index = 3;
570  }
571  }
572  if (RealTypeCount::value > 4)
573  {
574  M4Event& m4 = boost::get<4>(deques_).front();
575  if ((mt::TimeStamp<M4>::value(*m4.getMessage()) < time) ^ end)
576  {
577  time = mt::TimeStamp<M4>::value(*m4.getMessage());
578  index = 4;
579  }
580  }
581  if (RealTypeCount::value > 5)
582  {
583  M5Event& m5 = boost::get<5>(deques_).front();
584  if ((mt::TimeStamp<M5>::value(*m5.getMessage()) < time) ^ end)
585  {
586  time = mt::TimeStamp<M5>::value(*m5.getMessage());
587  index = 5;
588  }
589  }
590  if (RealTypeCount::value > 6)
591  {
592  M6Event& m6 = boost::get<6>(deques_).front();
593  if ((mt::TimeStamp<M6>::value(*m6.getMessage()) < time) ^ end)
594  {
595  time = mt::TimeStamp<M6>::value(*m6.getMessage());
596  index = 6;
597  }
598  }
599  if (RealTypeCount::value > 7)
600  {
601  M7Event& m7 = boost::get<7>(deques_).front();
602  if ((mt::TimeStamp<M7>::value(*m7.getMessage()) < time) ^ end)
603  {
604  time = mt::TimeStamp<M7>::value(*m7.getMessage());
605  index = 7;
606  }
607  }
608  if (RealTypeCount::value > 8)
609  {
610  M8Event& m8 = boost::get<8>(deques_).front();
611  if ((mt::TimeStamp<M8>::value(*m8.getMessage()) < time) ^ end)
612  {
613  time = mt::TimeStamp<M8>::value(*m8.getMessage());
614  index = 8;
615  }
616  }
617  }
618 
619 
620  // ASSUMES: we have a pivot and candidate
621  template<int i>
623  {
624  namespace mt = ros::message_traits;
625 
626  if (i >= RealTypeCount::value)
627  {
628  return ros::Time(0,0); // Dummy return value
629  }
631 
632  std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
633  std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
634  if (q.empty())
635  {
636  ROS_ASSERT(!v.empty()); // Because we have a candidate
637  ros::Time last_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(*(v.back()).getMessage());
638  ros::Time msg_time_lower_bound = last_msg_time + inter_message_lower_bounds_[i];
639  if (msg_time_lower_bound > pivot_time_) // Take the max
640  {
641  return msg_time_lower_bound;
642  }
643  return pivot_time_;
644  }
645  ros::Time current_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(*(q.front()).getMessage());
646  return current_msg_time;
647  }
648 
649 
650  // ASSUMES: we have a pivot and candidate
651  void getVirtualCandidateStart(uint32_t &start_index, ros::Time &start_time)
652  {
653  return getVirtualCandidateBoundary(start_index, start_time, false);
654  }
655 
656  // ASSUMES: we have a pivot and candidate
657  void getVirtualCandidateEnd(uint32_t &end_index, ros::Time &end_time)
658  {
659  return getVirtualCandidateBoundary(end_index, end_time, true);
660  }
661 
662  // ASSUMES: we have a pivot and candidate
663  // end = true: look for the latest head of deque
664  // false: look for the earliest head of deque
665  void getVirtualCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
666  {
667  namespace mt = ros::message_traits;
668 
669  std::vector<ros::Time> virtual_times(9);
670  virtual_times[0] = getVirtualTime<0>();
671  virtual_times[1] = getVirtualTime<1>();
672  virtual_times[2] = getVirtualTime<2>();
673  virtual_times[3] = getVirtualTime<3>();
674  virtual_times[4] = getVirtualTime<4>();
675  virtual_times[5] = getVirtualTime<5>();
676  virtual_times[6] = getVirtualTime<6>();
677  virtual_times[7] = getVirtualTime<7>();
678  virtual_times[8] = getVirtualTime<8>();
679 
680  time = virtual_times[0];
681  index = 0;
682  for (int i = 0; i < RealTypeCount::value; i++)
683  {
684  if ((virtual_times[i] < time) ^ end)
685  {
686  time = virtual_times[i];
687  index = i;
688  }
689  }
690  }
691 
692 
693  // assumes data_mutex_ is already locked
694  void process()
695  {
696  // While no deque is empty
697  while (num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
698  {
699  // Find the start and end of the current interval
700  //printf("Entering while loop in this state [\n");
701  //show_internal_state();
702  //printf("]\n");
703  ros::Time end_time, start_time;
704  uint32_t end_index, start_index;
705  getCandidateEnd(end_index, end_time);
706  getCandidateStart(start_index, start_time);
707  for (uint32_t i = 0; i < (uint32_t)RealTypeCount::value; i++)
708  {
709  if (i != end_index)
710  {
711  // No dropped message could have been better to use than the ones we have,
712  // so it becomes ok to use this topic as pivot in the future
713  has_dropped_messages_[i] = false;
714  }
715  }
716  if (pivot_ == NO_PIVOT)
717  {
718  // We do not have a candidate
719  // INVARIANT: the past_ vectors are empty
720  // INVARIANT: (candidate_ has no filled members)
721  if (end_time - start_time > max_interval_duration_)
722  {
723  // This interval is too big to be a valid candidate, move to the next
724  dequeDeleteFront(start_index);
725  continue;
726  }
727  if (has_dropped_messages_[end_index])
728  {
729  // The topic that would become pivot has dropped messages, so it is not a good pivot
730  dequeDeleteFront(start_index);
731  continue;
732  }
733  // This is a valid candidate, and we don't have any, so take it
734  makeCandidate();
735  candidate_start_ = start_time;
736  candidate_end_ = end_time;
737  pivot_ = end_index;
738  pivot_time_ = end_time;
739  dequeMoveFrontToPast(start_index);
740  }
741  else
742  {
743  // We already have a candidate
744  // Is this one better than the current candidate?
745  // INVARIANT: has_dropped_messages_ is all false
746  if ((end_time - candidate_end_) * (1 + age_penalty_) >= (start_time - candidate_start_))
747  {
748  // This is not a better candidate, move to the next
749  dequeMoveFrontToPast(start_index);
750  }
751  else
752  {
753  // This is a better candidate
754  makeCandidate();
755  candidate_start_ = start_time;
756  candidate_end_ = end_time;
757  dequeMoveFrontToPast(start_index);
758  // Keep the same pivot (and pivot time)
759  }
760  }
761  // INVARIANT: we have a candidate and pivot
763  //printf("start_index == %d, pivot_ == %d\n", start_index, pivot_);
764  if (start_index == pivot_) // TODO: replace with start_time == pivot_time_
765  {
766  // We have exhausted all possible candidates for this pivot, we now can output the best one
768  }
769  else if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_))
770  {
771  // We have not exhausted all candidates, but this candidate is already provably optimal
772  // Indeed, any future candidate must contain the interval [pivot_time_ end_time], which
773  // is already too big.
774  // Note: this case is subsumed by the next, but it may save some unnecessary work and
775  // it makes things (a little) easier to understand
777  }
778  else if (num_non_empty_deques_ < (uint32_t)RealTypeCount::value)
779  {
780  uint32_t num_non_empty_deques_before_virtual_search = num_non_empty_deques_;
781 
782  // Before giving up, use the rate bounds, if provided, to further try to prove optimality
783  std::vector<int> num_virtual_moves(9,0);
784  while (1)
785  {
786  ros::Time end_time, start_time;
787  uint32_t end_index, start_index;
788  getVirtualCandidateEnd(end_index, end_time);
789  getVirtualCandidateStart(start_index, start_time);
790  if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_))
791  {
792  // We have proved optimality
793  // As above, any future candidate must contain the interval [pivot_time_ end_time], which
794  // is already too big.
795  publishCandidate(); // This cleans up the virtual moves as a byproduct
796  break; // From the while(1) loop only
797  }
798  if ((end_time - candidate_end_) * (1 + age_penalty_) < (start_time - candidate_start_))
799  {
800  // We cannot prove optimality
801  // Indeed, we have a virtual (i.e. optimistic) candidate that is better than the current
802  // candidate
803  // Cleanup the virtual search:
804  num_non_empty_deques_ = 0; // We will recompute it from scratch
805  recover<0>(num_virtual_moves[0]);
806  recover<1>(num_virtual_moves[1]);
807  recover<2>(num_virtual_moves[2]);
808  recover<3>(num_virtual_moves[3]);
809  recover<4>(num_virtual_moves[4]);
810  recover<5>(num_virtual_moves[5]);
811  recover<6>(num_virtual_moves[6]);
812  recover<7>(num_virtual_moves[7]);
813  recover<8>(num_virtual_moves[8]);
814  (void)num_non_empty_deques_before_virtual_search; // unused variable warning stopper
815  ROS_ASSERT(num_non_empty_deques_before_virtual_search == num_non_empty_deques_);
816  break;
817  }
818  // Note: we cannot reach this point with start_index == pivot_ since in that case we would
819  // have start_time == pivot_time, in which case the two tests above are the negation
820  // of each other, so that one must be true. Therefore the while loop always terminates.
821  ROS_ASSERT(start_index != pivot_);
822  ROS_ASSERT(start_time < pivot_time_);
823  dequeMoveFrontToPast(start_index);
824  num_virtual_moves[start_index]++;
825  } // while(1)
826  }
827  } // while(num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
828  }
829 
830  Sync* parent_;
831  uint32_t queue_size_;
832 
833  static const uint32_t NO_PIVOT = 9; // Special value for the pivot indicating that no pivot has been selected
834 
835  DequeTuple deques_;
837  VectorTuple past_;
838  Tuple candidate_; // NULL if there is no candidate, in which case there is no pivot.
842  uint32_t pivot_; // Equal to NO_PIVOT if there is no candidate
843  boost::mutex data_mutex_; // Protects all of the above
844 
845  ros::Duration max_interval_duration_; // TODO: initialize with a parameter
846  double age_penalty_;
847 
848  std::vector<bool> has_dropped_messages_;
849  std::vector<ros::Duration> inter_message_lower_bounds_;
850  std::vector<bool> warned_about_incorrect_bound_;
851 };
852 
853 } // namespace sync
854 } // namespace message_filters
855 
856 #endif // MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
857 
void setInterMessageLowerBound(int i, ros::Duration lower_bound)
boost::tuple< M0Event, M1Event, M2Event, M3Event, M4Event, M5Event, M6Event, M7Event, M8Event > Tuple
mpl::vector< ros::MessageEvent< M0 const >, ros::MessageEvent< M1 const >, ros::MessageEvent< M2 const >, ros::MessageEvent< M3 const >, ros::MessageEvent< M4 const >, ros::MessageEvent< M5 const >, ros::MessageEvent< M6 const >, ros::MessageEvent< M7 const >, ros::MessageEvent< M8 const > > Events
Definition: synchronizer.h:378
void getCandidateEnd(uint32_t &end_index, ros::Time &end_time)
std::vector< ros::Duration > inter_message_lower_bounds_
void signal(const M0Event &e0, const M1Event &e1, const M2Event &e2, const M3Event &e3, const M4Event &e4, const M5Event &e5, const M6Event &e6, const M7Event &e7, const M8Event &e8)
Definition: synchronizer.h:329
void getVirtualCandidateStart(uint32_t &start_index, ros::Time &start_time)
mpl::at_c< Events, 1 >::type M1Event
Definition: synchronizer.h:381
mpl::fold< Messages, mpl::int_< 0 >, mpl::if_< mpl::not_< boost::is_same< mpl::_2, NullType > >, mpl::next< mpl::_1 >, mpl::_1 > >::type RealTypeCount
Definition: synchronizer.h:379
void getCandidateStart(uint32_t &start_index, ros::Time &start_time)
mpl::at_c< Events, 5 >::type M5Event
Definition: synchronizer.h:385
mpl::at_c< Events, 2 >::type M2Event
Definition: synchronizer.h:382
mpl::vector< M0, M1, M2, M3, M4, M5, M6, M7, M8 > Messages
Definition: synchronizer.h:374
void setMaxIntervalDuration(ros::Duration max_interval_duration)
void getVirtualCandidateEnd(uint32_t &end_index, ros::Time &end_time)
mpl::at_c< Events, 0 >::type M0Event
Definition: synchronizer.h:380
#define ROS_WARN_STREAM(args)
mpl::at_c< Events, 3 >::type M3Event
Definition: synchronizer.h:383
boost::tuple< M0Deque, M1Deque, M2Deque, M3Deque, M4Deque, M5Deque, M6Deque, M7Deque, M8Deque > DequeTuple
void add(const typename mpl::at_c< Events, i >::type &evt)
void getCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
PolicyBase< M0, M1, M2, M3, M4, M5, M6, M7, M8 > Super
#define ROS_ASSERT(cond)
mpl::at_c< Events, 7 >::type M7Event
Definition: synchronizer.h:387
mpl::at_c< Events, 4 >::type M4Event
Definition: synchronizer.h:384
#define ROS_BREAK()
void getVirtualCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
boost::tuple< M0Vector, M1Vector, M2Vector, M3Vector, M4Vector, M5Vector, M6Vector, M7Vector, M8Vector > VectorTuple
ApproximateTime & operator=(const ApproximateTime &rhs)
mpl::at_c< Events, 8 >::type M8Event
Definition: synchronizer.h:388
mpl::at_c< Events, 6 >::type M6Event
Definition: synchronizer.h:386


message_filters
Author(s): Josh Faust, Vijay Pradeep, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:41