ApproximateTime.h
Go to the documentation of this file.
1 /*********************************************************************
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2015-2020, Dataspeed Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Dataspeed Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *********************************************************************/
34 
35 #ifndef _CAN_APPROXIMATE_TIME_H_
36 #define _CAN_APPROXIMATE_TIME_H_
37 
38 #include <deque>
39 #include <ros/ros.h>
40 #include <can_msgs/Frame.h>
41 
43 {
44 /*
45  * Synchronize several messages that don't have exact matching timestamps
46  *
47  * From the wiki (https://wiki.ros.org/message_filters/ApproximateTime):
48  * This is a policy used by message_filters::sync::Synchronizer to match messages coming on a set of topics.
49  * Contrary to message_filters::sync::ExactTime, it can match messages even if they have different time stamps.
50  * We call size of a set of messages the difference between the latest and earliest time stamp in the set.
51  *
52  * The algorithm is the product of long discussions with Blaise. It does not work like ExactTime except with
53  * matching allowed up to some epsilon time difference. Instead it finds the best match. It satisfies these properties:
54  * - The algorithm is parameter free. No need to specify an epsilon. Some parameters can be provided (see below),
55  * but they are optional.
56  * - Messages are used only once. Two sets cannot share the same message. Some messages can be dropped.
57  * - Sets do not cross. For two sets S and T, their messages satisfy either Si <= Ti for all i, or Ti <= Si for all i,
58  * where i runs over topics.
59  * - Sets are contiguous. There is at least one topic where there is no dropped message between the two sets.
60  * In other words there is no room to form another set with the dropped messages.
61  * - Sets are of minimal size among the sets contiguous to the previous published set.
62  * - The output only depends on the time stamps, not on the arrival time of messages. It does assume that messages arrive
63  * in order on each topic, but not even necessarily across topics (though the queue size must be large enough if there are
64  * big differences or messages will be dropped). This means that ApproximateTime can be safely used on messages that have
65  * suffered arbitrary networking or processing delays.
66  */
68 {
69 public:
70  typedef can_msgs::Frame::ConstPtr Type;
71  typedef boost::function<void(const std::vector<Type> &vec)> Callback;
72 
73  static bool ValidId(uint32_t id)
74  {
75  if (id & 0x80000000) {
76  if (!(id & ~0x9FFFFFFF)) {
77  return true; // Extended ID
78  }
79  } else {
80  if (!(id & ~0x7FF)) {
81  return true; // Standard ID
82  }
83  }
84  return false;
85  }
86  static bool ValidId(uint32_t id, bool extended)
87  {
88  return extended ? !(id & ~0x1FFFFFFF) : !(id & ~0x7FF);
89  }
90  static bool ValidId(const Type &msg) { return ValidId(msg->id, msg->is_extended); }
91  static uint32_t BuildId(uint32_t id, bool extended)
92  {
93  return extended ? ((id & 0x1FFFFFFF) | 0x80000000) : (id & 0x7FF);
94  }
95  static uint32_t BuildId(const Type &msg) { return BuildId(msg->id, msg->is_extended); }
96 
97  ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2)
98 : queue_size_(queue_size)
99 , callback_(callback)
101 , pivot_(NO_PIVOT)
102 , max_interval_duration_(ros::DURATION_MAX)
103 , age_penalty_(0.1)
104  {
105  ROS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
106  ROS_ASSERT(ValidId(id1));
107  ROS_ASSERT(ValidId(id2));
108 
109  std::vector<uint32_t> ids(2);
110  ids[0] = id1;
111  ids[1] = id2;
112 
113  vector_.resize(ids.size());
114  for (size_t i = 0; i < ids.size(); i++) {
115  vector_[i].id = ids[i];
116  vector_[i].has_dropped_messages = false;
117  vector_[i].inter_message_lower_bounds = ros::Duration(0);
118  vector_[i].warned_about_incorrect_bound = false;
119  }
120  }
121  ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3)
122 : queue_size_(queue_size)
123 , callback_(callback)
125 , pivot_(NO_PIVOT)
126 , max_interval_duration_(ros::DURATION_MAX)
127 , age_penalty_(0.1)
128  {
129  ROS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
130  ROS_ASSERT(ValidId(id1));
131  ROS_ASSERT(ValidId(id2));
132  ROS_ASSERT(ValidId(id3));
133 
134  std::vector<uint32_t> ids(3);
135  ids[0] = id1;
136  ids[1] = id2;
137  ids[2] = id3;
138 
139  vector_.resize(ids.size());
140  for (size_t i = 0; i < ids.size(); i++) {
141  vector_[i].id = ids[i];
142  vector_[i].has_dropped_messages = false;
143  vector_[i].inter_message_lower_bounds = ros::Duration(0);
144  vector_[i].warned_about_incorrect_bound = false;
145  }
146  }
147  ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4)
148 : queue_size_(queue_size)
149 , callback_(callback)
151 , pivot_(NO_PIVOT)
152 , max_interval_duration_(ros::DURATION_MAX)
153 , age_penalty_(0.1)
154  {
155  ROS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
156  ROS_ASSERT(ValidId(id1));
157  ROS_ASSERT(ValidId(id2));
158  ROS_ASSERT(ValidId(id3));
159  ROS_ASSERT(ValidId(id4));
160 
161  std::vector<uint32_t> ids(4);
162  ids[0] = id1;
163  ids[1] = id2;
164  ids[2] = id3;
165  ids[3] = id4;
166 
167  vector_.resize(ids.size());
168  for (size_t i = 0; i < ids.size(); i++) {
169  vector_[i].id = ids[i];
170  vector_[i].has_dropped_messages = false;
171  vector_[i].inter_message_lower_bounds = ros::Duration(0);
172  vector_[i].warned_about_incorrect_bound = false;
173  }
174  }
175  ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5)
176 : queue_size_(queue_size)
177 , callback_(callback)
179 , pivot_(NO_PIVOT)
180 , max_interval_duration_(ros::DURATION_MAX)
181 , age_penalty_(0.1)
182  {
183  ROS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
184  ROS_ASSERT(ValidId(id1));
185  ROS_ASSERT(ValidId(id2));
186  ROS_ASSERT(ValidId(id3));
187  ROS_ASSERT(ValidId(id4));
188  ROS_ASSERT(ValidId(id5));
189 
190  std::vector<uint32_t> ids(5);
191  ids[0] = id1;
192  ids[1] = id2;
193  ids[2] = id3;
194  ids[3] = id4;
195  ids[4] = id5;
196 
197  vector_.resize(ids.size());
198  for (size_t i = 0; i < ids.size(); i++) {
199  vector_[i].id = ids[i];
200  vector_[i].has_dropped_messages = false;
201  vector_[i].inter_message_lower_bounds = ros::Duration(0);
202  vector_[i].warned_about_incorrect_bound = false;
203  }
204  }
205  ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6)
206 : queue_size_(queue_size)
207 , callback_(callback)
209 , pivot_(NO_PIVOT)
210 , max_interval_duration_(ros::DURATION_MAX)
211 , age_penalty_(0.1)
212  {
213  ROS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
214  ROS_ASSERT(ValidId(id1));
215  ROS_ASSERT(ValidId(id2));
216  ROS_ASSERT(ValidId(id3));
217  ROS_ASSERT(ValidId(id4));
218  ROS_ASSERT(ValidId(id5));
219  ROS_ASSERT(ValidId(id6));
220 
221  std::vector<uint32_t> ids(6);
222  ids[0] = id1;
223  ids[1] = id2;
224  ids[2] = id3;
225  ids[3] = id4;
226  ids[4] = id5;
227  ids[5] = id6;
228 
229  vector_.resize(ids.size());
230  for (size_t i = 0; i < ids.size(); i++) {
231  vector_[i].id = ids[i];
232  vector_[i].has_dropped_messages = false;
233  vector_[i].inter_message_lower_bounds = ros::Duration(0);
234  vector_[i].warned_about_incorrect_bound = false;
235  }
236  }
237  ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6, uint32_t id7)
238 : queue_size_(queue_size)
239 , callback_(callback)
241 , pivot_(NO_PIVOT)
242 , max_interval_duration_(ros::DURATION_MAX)
243 , age_penalty_(0.1)
244  {
245  ROS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
246  ROS_ASSERT(ValidId(id1));
247  ROS_ASSERT(ValidId(id2));
248  ROS_ASSERT(ValidId(id3));
249  ROS_ASSERT(ValidId(id4));
250  ROS_ASSERT(ValidId(id5));
251  ROS_ASSERT(ValidId(id6));
252  ROS_ASSERT(ValidId(id7));
253 
254  std::vector<uint32_t> ids(7);
255  ids[0] = id1;
256  ids[1] = id2;
257  ids[2] = id3;
258  ids[3] = id4;
259  ids[4] = id5;
260  ids[5] = id6;
261  ids[6] = id7;
262 
263  vector_.resize(ids.size());
264  for (size_t i = 0; i < ids.size(); i++) {
265  vector_[i].id = ids[i];
266  vector_[i].has_dropped_messages = false;
267  vector_[i].inter_message_lower_bounds = ros::Duration(0);
268  vector_[i].warned_about_incorrect_bound = false;
269  }
270  }
271  ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6, uint32_t id7, uint32_t id8)
272 : queue_size_(queue_size)
273 , callback_(callback)
275 , pivot_(NO_PIVOT)
276 , max_interval_duration_(ros::DURATION_MAX)
277 , age_penalty_(0.1)
278  {
279  ROS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
280  ROS_ASSERT(ValidId(id1));
281  ROS_ASSERT(ValidId(id2));
282  ROS_ASSERT(ValidId(id3));
283  ROS_ASSERT(ValidId(id4));
284  ROS_ASSERT(ValidId(id5));
285  ROS_ASSERT(ValidId(id6));
286  ROS_ASSERT(ValidId(id7));
287  ROS_ASSERT(ValidId(id8));
288 
289  std::vector<uint32_t> ids(8);
290  ids[0] = id1;
291  ids[1] = id2;
292  ids[2] = id3;
293  ids[3] = id4;
294  ids[4] = id5;
295  ids[5] = id6;
296  ids[6] = id7;
297  ids[7] = id8;
298 
299  vector_.resize(ids.size());
300  for (size_t i = 0; i < ids.size(); i++) {
301  vector_[i].id = ids[i];
302  vector_[i].has_dropped_messages = false;
303  vector_[i].inter_message_lower_bounds = ros::Duration(0);
304  vector_[i].warned_about_incorrect_bound = false;
305  }
306  }
307  ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6, uint32_t id7, uint32_t id8, uint32_t id9)
308 : queue_size_(queue_size)
309 , callback_(callback)
311 , pivot_(NO_PIVOT)
312 , max_interval_duration_(ros::DURATION_MAX)
313 , age_penalty_(0.1)
314  {
315  ROS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
316  ROS_ASSERT(ValidId(id1));
317  ROS_ASSERT(ValidId(id2));
318  ROS_ASSERT(ValidId(id3));
319  ROS_ASSERT(ValidId(id4));
320  ROS_ASSERT(ValidId(id5));
321  ROS_ASSERT(ValidId(id6));
322  ROS_ASSERT(ValidId(id7));
323  ROS_ASSERT(ValidId(id8));
324  ROS_ASSERT(ValidId(id9));
325 
326  std::vector<uint32_t> ids(9);
327  ids[0] = id1;
328  ids[1] = id2;
329  ids[2] = id3;
330  ids[3] = id4;
331  ids[4] = id5;
332  ids[5] = id6;
333  ids[6] = id7;
334  ids[7] = id8;
335  ids[8] = id9;
336 
337  vector_.resize(ids.size());
338  for (size_t i = 0; i < ids.size(); i++) {
339  vector_[i].id = ids[i];
340  vector_[i].has_dropped_messages = false;
341  vector_[i].inter_message_lower_bounds = ros::Duration(0);
342  vector_[i].warned_about_incorrect_bound = false;
343  }
344  }
346 
347  void processMsg(const Type &msg)
348  {
349  if (msg->is_rtr || msg->is_error) return;
350  ROS_WARN_COND(!ValidId(msg), "Processed CAN message with invalid id: 0x%X (%s)", msg->id, msg->is_extended ? "extended" : "standard");
351  for (size_t i = 0; i < vector_.size(); i++) {
352  if (BuildId(msg) == vector_[i].id) {
353 #if 0
354  ROS_INFO("Id 0x%X: %u.%u", BuildId(msg), msg->header.stamp.sec, msg->header.stamp.nsec);
355 #endif
356  std::deque<Type>& deque = vector_[i].deque;
357  deque.push_back(msg);
358  if (deque.size() == (size_t)1) {
359  // We have just added the first message, so it was empty before
361  if (num_non_empty_deques_ == (uint32_t)vector_.size()) {
362  // All deques have messages
363  process();
364  }
365  } else {
367  }
368 
369  // Check whether we have more messages than allowed in the queue.
370  // Note that during the above call to process(), queue i may contain queue_size_+1 messages.
371  std::vector<Type>& past = vector_[i].past;
372  if (deque.size() + past.size() > queue_size_) {
373  // Cancel ongoing candidate search, if any:
374  num_non_empty_deques_ = 0; // We will recompute it from scratch
375  {for (size_t i = 0; i < vector_.size(); i++) {
376  recover(i);
377  }}
378  // Drop the oldest message in the offending topic
379  ROS_ASSERT(!deque.empty());
380  deque.pop_front();
381  vector_[i].has_dropped_messages = true;
382  if (pivot_ != NO_PIVOT) {
383  // The candidate is no longer valid. Destroy it.
384  for (size_t i = 0; i < vector_.size(); i++) {
385  vector_[i].candidate.reset();
386  }
387  pivot_ = NO_PIVOT;
388  // There might still be enough messages to create a new candidate:
389  process();
390  }
391  }
392  return;
393  }
394  }
395  }
396 
397  /*
398  * Set the Age penalty: when comparing the size of sets, later intervals are penalized by a factor (1+AgePenalty).
399  * The default is 0. A non zero penalty can help output sets earlier, or output more sets, at some cost in quality.
400  */
401  void setAgePenalty(double age_penalty) {
402  // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
403  ROS_ASSERT(age_penalty >= 0);
404  age_penalty_ = age_penalty;
405  }
406 
407  /*
408  * Set the Inter message lower bound: if messages of a particular topic cannot be closer together than a known interval,
409  * providing this lower bound will not change the output but will allow the algorithm to conclude earlier that a given
410  * set is optimal, reducing delays. With the default value of 0, for messages spaced on average by a duration T,
411  * the algorithm can introduce a delay of about T. With good bounds provided a set can often be published as soon as
412  * the last message of the set is received. An incorrect bound will result in suboptimal sets being selected. A typical
413  * bound is, say, 1/2 the frame rate of a camera.
414  */
416  ROS_ASSERT(lower_bound >= ros::Duration(0,0));
417  for (size_t i = 0; i < vector_.size(); i++) {
418  vector_[i].inter_message_lower_bounds = lower_bound;
419  }
420  }
421 
422  /*
423  * Set the Inter message lower bound for each individual message index
424  */
425  void setInterMessageLowerBound(size_t i, ros::Duration lower_bound) {
426  // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
427  ROS_ASSERT(lower_bound >= ros::Duration(0,0));
428  ROS_ASSERT(i < vector_.size());
429  vector_[i].inter_message_lower_bounds = lower_bound;
430  }
431 
432  /*
433  * Set the Max interval duration: sets of more than this size will not be considered (disabled by default). The effect
434  * is similar to throwing away a posteriori output sets that are too large, but it can be a little better.
435  */
436  void setMaxIntervalDuration(ros::Duration max_interval_duration) {
437  // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
438  ROS_ASSERT(max_interval_duration >= ros::Duration(0,0));
439  max_interval_duration_ = max_interval_duration;
440  }
441 
442 
443 private:
444  void checkInterMessageBound(size_t i)
445  {
446  namespace mt = ros::message_traits;
447  if (vector_[i].warned_about_incorrect_bound) {
448  return;
449  }
450  std::deque<Type>& deque = vector_[i].deque;
451  std::vector<Type>& v = vector_[i].past;
452  ROS_ASSERT(!deque.empty());
453  ros::Time msg_time = deque.back()->header.stamp;
454  ros::Time previous_msg_time;
455  if (deque.size() == (size_t)1) {
456  if (v.empty()) {
457  // We have already published (or have never received) the previous message, we cannot check the bound
458  return;
459  }
460  previous_msg_time = v.back()->header.stamp;
461  } else {
462  // There are at least 2 elements in the deque. Check that the gap respects the bound if it was provided.
463  previous_msg_time = deque[deque.size()-2]->header.stamp;
464  }
465  if (msg_time < previous_msg_time) {
466  ROS_WARN_STREAM("Messages of type " << i << " arrived out of order (will print only once)");
467  vector_[i].warned_about_incorrect_bound = true;
468  } else if ((msg_time - previous_msg_time) < vector_[i].inter_message_lower_bounds) {
469  ROS_WARN_STREAM("Messages of type " << i << " arrived closer (" << (msg_time - previous_msg_time)
470  << ") than the lower bound you provided (" << vector_[i].inter_message_lower_bounds
471  << ") (will print only once)");
472  vector_[i].warned_about_incorrect_bound = true;
473  }
474  }
475 
476  // Assumes that deque number <index> is non empty
477  void dequeDeleteFront(size_t i)
478  {
479  std::deque<Type>& deque = vector_[i].deque;
480  ROS_ASSERT(!deque.empty());
481  deque.pop_front();
482  if (deque.empty()) {
484  }
485  }
486 
487  // Assumes that deque number <index> is non empty
488  void dequeMoveFrontToPast(size_t i)
489  {
490  std::deque<Type>& deque = vector_[i].deque;
491  std::vector<Type>& vector = vector_[i].past;
492  ROS_ASSERT(!deque.empty());
493  vector.push_back(deque.front());
494  deque.pop_front();
495  if (deque.empty()) {
497  }
498  }
499 
501  {
502  //printf("Creating candidate\n");
503  for (size_t i = 0; i < vector_.size(); i++) {
504  vector_[i].candidate = vector_[i].deque.front(); // Create candidate tuple
505  vector_[i].past.clear(); // Delete all past messages, since we have found a better candidate
506  }
507  //printf("Candidate created\n");
508  }
509 
510  void recover(size_t i, size_t num_messages)
511  {
512  std::vector<Type>& v = vector_[i].past;
513  std::deque<Type>& q = vector_[i].deque;
514  ROS_ASSERT(num_messages <= v.size());
515  while (num_messages > 0) {
516  q.push_front(v.back());
517  v.pop_back();
518  num_messages--;
519  }
520 
521  if (!q.empty()) {
523  }
524  }
525 
526 
527  void recover(size_t i)
528  {
529  std::vector<Type>& v = vector_[i].past;
530  std::deque<Type>& q = vector_[i].deque;
531  while (!v.empty()) {
532  q.push_front(v.back());
533  v.pop_back();
534  }
535 
536  if (!q.empty()) {
538  }
539  }
540 
541  void recoverAndDelete(size_t i)
542  {
543  std::vector<Type>& v = vector_[i].past;
544  std::deque<Type>& q = vector_[i].deque;
545  while (!v.empty()) {
546  q.push_front(v.back());
547  v.pop_back();
548  }
549 
550  ROS_ASSERT(!q.empty());
551 
552  q.pop_front();
553  if (!q.empty()) {
555  }
556  }
557 
558  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
560  {
561  //printf("Publishing candidate\n");
562  // Publish
563  std::vector<Type> candidate;
564  for (size_t i = 0; i < vector_.size(); i++) {
565  candidate.push_back(vector_[i].candidate);
566  }
567  callback_(candidate);
568 
569  // Delete this candidate
570  for (size_t i = 0; i < vector_.size(); i++) {
571  vector_[i].candidate.reset();
572  }
573  pivot_ = NO_PIVOT;
574 
575  // Recover hidden messages, and delete the ones corresponding to the candidate
576  num_non_empty_deques_ = 0; // We will recompute it from scratch
577  for (size_t i = 0; i < vector_.size(); i++) {
578  recoverAndDelete(i);
579  }
580  }
581 
582  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
583  // Returns: the oldest message on the deques
584  void getCandidateStart(uint32_t &start_index, ros::Time &start_time)
585  {
586  return getCandidateBoundary(start_index, start_time, false);
587  }
588 
589  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
590  // Returns: the latest message among the heads of the deques, i.e. the minimum
591  // time to end an interval started at getCandidateStart_index()
592  void getCandidateEnd(uint32_t &end_index, ros::Time &end_time)
593  {
594  return getCandidateBoundary(end_index, end_time, true);
595  }
596 
597  // ASSUMES: all deques are non-empty
598  // end = true: look for the latest head of deque
599  // false: look for the earliest head of deque
600  void getCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
601  {
602  time = vector_[0].deque.front()->header.stamp;
603  index = 0;
604  for (size_t i = 1; i < vector_.size(); i++) {
605  const ros::Time &t = vector_[i].deque.front()->header.stamp;
606  if ((t < time) ^ end) {
607  time = t;
608  index = i;
609  }
610  }
611  }
612 
613  // ASSUMES: we have a pivot and candidate
615  {
616  namespace mt = ros::message_traits;
617 
618  if (i >= vector_.size()) {
619  return ros::Time(0,0); // Dummy return value
620  }
622 
623  std::vector<Type>& v = vector_[i].past;
624  std::deque<Type>& q = vector_[i].deque;
625  if (q.empty()) {
626  ROS_ASSERT(!v.empty()); // Because we have a candidate
627  ros::Time last_msg_time = v.back()->header.stamp;
628  ros::Time msg_time_lower_bound = last_msg_time + vector_[i].inter_message_lower_bounds;
629  if (msg_time_lower_bound > pivot_time_) { // Take the max
630  return msg_time_lower_bound;
631  }
632  return pivot_time_;
633  }
634  ros::Time current_msg_time = q.front()->header.stamp;
635  return current_msg_time;
636  }
637 
638 
639  // ASSUMES: we have a pivot and candidate
640  void getVirtualCandidateStart(uint32_t &start_index, ros::Time &start_time)
641  {
642  return getVirtualCandidateBoundary(start_index, start_time, false);
643  }
644 
645  // ASSUMES: we have a pivot and candidate
646  void getVirtualCandidateEnd(uint32_t &end_index, ros::Time &end_time)
647  {
648  return getVirtualCandidateBoundary(end_index, end_time, true);
649  }
650 
651  // ASSUMES: we have a pivot and candidate
652  // end = true: look for the latest head of deque
653  // false: look for the earliest head of deque
654  void getVirtualCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
655  {
656  namespace mt = ros::message_traits;
657 
658  std::vector<ros::Time> virtual_times(vector_.size());
659  for (size_t i = 0; i < vector_.size(); i++) {
660  virtual_times[i] = getVirtualTime(i);
661  }
662 
663  time = virtual_times[0];
664  index = 0;
665  for (size_t i = 0; i < vector_.size(); i++) {
666  if ((virtual_times[i] < time) ^ end) {
667  time = virtual_times[i];
668  index = i;
669  }
670  }
671  }
672 
673 
674  // assumes data_mutex_ is already locked
675  void process()
676  {
677  // While no deque is empty
678  while (num_non_empty_deques_ == (uint32_t)vector_.size()) {
679  // Find the start and end of the current interval
680  //printf("Entering while loop in this state [\n");
681  //show_internal_state();
682  //printf("]\n");
683  ros::Time end_time, start_time;
684  uint32_t end_index, start_index;
685  getCandidateEnd(end_index, end_time);
686  getCandidateStart(start_index, start_time);
687  for (uint32_t i = 0; i < (uint32_t)vector_.size(); i++) {
688  if (i != end_index) {
689  // No dropped message could have been better to use than the ones we have,
690  // so it becomes ok to use this topic as pivot in the future
691  vector_[i].has_dropped_messages = false;
692  }
693  }
694  if (pivot_ == NO_PIVOT) {
695  // We do not have a candidate
696  // INVARIANT: the past_ vectors are empty
697  // INVARIANT: (candidate_ has no filled members)
698  if (end_time - start_time > max_interval_duration_) {
699  // This interval is too big to be a valid candidate, move to the next
700  dequeDeleteFront(start_index);
701  continue;
702  }
703  if (vector_[end_index].has_dropped_messages) {
704  // The topic that would become pivot has dropped messages, so it is not a good pivot
705  dequeDeleteFront(start_index);
706  continue;
707  }
708  // This is a valid candidate, and we don't have any, so take it
709  makeCandidate();
710  candidate_start_ = start_time;
711  candidate_end_ = end_time;
712  pivot_ = end_index;
713  pivot_time_ = end_time;
714  dequeMoveFrontToPast(start_index);
715  } else {
716  // We already have a candidate
717  // Is this one better than the current candidate?
718  // INVARIANT: has_dropped_messages_ is all false
719  if ((end_time - candidate_end_) * (1 + age_penalty_) >= (start_time - candidate_start_)) {
720  // This is not a better candidate, move to the next
721  dequeMoveFrontToPast(start_index);
722  } else {
723  // This is a better candidate
724  makeCandidate();
725  candidate_start_ = start_time;
726  candidate_end_ = end_time;
727  dequeMoveFrontToPast(start_index);
728  // Keep the same pivot (and pivot time)
729  }
730  }
731  // INVARIANT: we have a candidate and pivot
733  //printf("start_index == %d, pivot_ == %d\n", start_index, pivot_);
734  if (start_index == pivot_) { // TODO: replace with start_time == pivot_time_
735  // We have exhausted all possible candidates for this pivot, we now can output the best one
737  } else if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_)) {
738  // We have not exhausted all candidates, but this candidate is already provably optimal
739  // Indeed, any future candidate must contain the interval [pivot_time_ end_time], which
740  // is already too big.
741  // Note: this case is subsumed by the next, but it may save some unnecessary work and
742  // it makes things (a little) easier to understand
744  } else if (num_non_empty_deques_ < (uint32_t)vector_.size()) {
745  uint32_t num_non_empty_deques_before_virtual_search = num_non_empty_deques_;
746 
747  // Before giving up, use the rate bounds, if provided, to further try to prove optimality
748  std::vector<int> num_virtual_moves(9,0);
749  while (1) {
750  ros::Time end_time, start_time;
751  uint32_t end_index, start_index;
752  getVirtualCandidateEnd(end_index, end_time);
753  getVirtualCandidateStart(start_index, start_time);
754  if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_)) {
755  // We have proved optimality
756  // As above, any future candidate must contain the interval [pivot_time_ end_time], which
757  // is already too big.
758  publishCandidate(); // This cleans up the virtual moves as a byproduct
759  break; // From the while(1) loop only
760  }
761  if ((end_time - candidate_end_) * (1 + age_penalty_) < (start_time - candidate_start_)) {
762  // We cannot prove optimality
763  // Indeed, we have a virtual (i.e. optimistic) candidate that is better than the current
764  // candidate
765  // Cleanup the virtual search:
766  num_non_empty_deques_ = 0; // We will recompute it from scratch
767  for (size_t i = 0; i < vector_.size(); i++) {
768  recover(i, num_virtual_moves[i]);
769  }
770  (void)num_non_empty_deques_before_virtual_search; // unused variable warning stopper
771  ROS_ASSERT(num_non_empty_deques_before_virtual_search == num_non_empty_deques_);
772  break;
773  }
774  // Note: we cannot reach this point with start_index == pivot_ since in that case we would
775  // have start_time == pivot_time, in which case the two tests above are the negation
776  // of each other, so that one must be true. Therefore the while loop always terminates.
777  ROS_ASSERT(start_index != pivot_);
778  ROS_ASSERT(start_time < pivot_time_);
779  dequeMoveFrontToPast(start_index);
780  num_virtual_moves[start_index]++;
781  } // while(1)
782  }
783  } // while(num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
784  }
785 
786 
787  uint32_t queue_size_;
789 
790  static const uint32_t NO_PIVOT = 9; // Special value for the pivot indicating that no pivot has been selected
791 
792  typedef struct {
793  uint32_t id;
794  std::deque<Type> deque;
795  std::vector<Type> past;
796  Type candidate; // NULL if there is no candidate, in which case there is no pivot.
800  } VectorData;
801  std::vector<VectorData> vector_;
806  uint32_t pivot_; // Equal to NO_PIVOT if there is no candidate
807 
808  ros::Duration max_interval_duration_; // TODO: initialize with a parameter
809  double age_penalty_;
810 };
811 
812 } // namespace dataspeed_can_msg_filters
813 
814 #endif // _CAN_APPROXIMATE_TIME_H_
815 
void getVirtualCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
void getCandidateStart(uint32_t &start_index, ros::Time &start_time)
void getCandidateEnd(uint32_t &end_index, ros::Time &end_time)
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5)
static bool ValidId(uint32_t id, bool extended)
void getVirtualCandidateEnd(uint32_t &end_index, ros::Time &end_time)
void getVirtualCandidateStart(uint32_t &start_index, ros::Time &start_time)
static uint32_t BuildId(const Type &msg)
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3)
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4)
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6, uint32_t id7, uint32_t id8)
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6)
#define ROS_INFO(...)
void getCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
void setInterMessageLowerBound(size_t i, ros::Duration lower_bound)
#define ROS_WARN_STREAM(args)
static uint32_t BuildId(uint32_t id, bool extended)
void setMaxIntervalDuration(ros::Duration max_interval_duration)
void setInterMessageLowerBound(ros::Duration lower_bound)
#define ROS_WARN_COND(cond,...)
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2)
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6, uint32_t id7)
static bool ValidId(const Type &msg)
void recover(size_t i, size_t num_messages)
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6, uint32_t id7, uint32_t id8, uint32_t id9)
#define ROS_ASSERT(cond)
boost::function< void(const std::vector< Type > &vec)> Callback


dataspeed_can_msg_filters
Author(s): Kevin Hallenbeck
autogenerated on Thu Jul 9 2020 03:41:58