Program Listing for File ApproximateTime.hpp
↰ Return to documentation for file (/tmp/ws/src/dataspeed_can/dataspeed_can_msg_filters/include/dataspeed_can_msg_filters/ApproximateTime.hpp
)
/*********************************************************************
* Software License Agreement (BSD License)
*
* Copyright (c) 2015-2021, Dataspeed Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
* * Neither the name of Dataspeed Inc. nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*********************************************************************/
#pragma once
#include <deque>
#include <can_msgs/msg/frame.hpp>
#include <rclcpp/time.hpp>
#include <rcutils/logging_macros.h>
#include <inttypes.h> //for logging
#include <chrono>
#ifndef RCUTILS_ASSERT
#define RCUTILS_ASSERT assert
#endif
namespace dataspeed_can_msg_filters
{
/*
* Synchronize several messages that don't have exact matching timestamps
*
* From the wiki (https://wiki.ros.org/message_filters/ApproximateTime):
* This is a policy used by message_filters::sync::Synchronizer to match messages coming on a set of topics.
* Contrary to message_filters::sync::ExactTime, it can match messages even if they have different time stamps.
* We call size of a set of messages the difference between the latest and earliest time stamp in the set.
*
* The algorithm is the product of long discussions with Blaise. It does not work like ExactTime except with
* matching allowed up to some epsilon time difference. Instead it finds the best match. It satisfies these properties:
* - The algorithm is parameter free. No need to specify an epsilon. Some parameters can be provided (see below),
* but they are optional.
* - Messages are used only once. Two sets cannot share the same message. Some messages can be dropped.
* - Sets do not cross. For two sets S and T, their messages satisfy either Si <= Ti for all i, or Ti <= Si for all i,
* where i runs over topics.
* - Sets are contiguous. There is at least one topic where there is no dropped message between the two sets.
* In other words there is no room to form another set with the dropped messages.
* - Sets are of minimal size among the sets contiguous to the previous published set.
* - The output only depends on the time stamps, not on the arrival time of messages. It does assume that messages arrive
* in order on each topic, but not even necessarily across topics (though the queue size must be large enough if there are
* big differences or messages will be dropped). This means that ApproximateTime can be safely used on messages that have
* suffered arbitrary networking or processing delays.
*/
class ApproximateTime
{
public:
typedef can_msgs::msg::Frame::ConstSharedPtr Type;
typedef std::function<void(const std::vector<Type> &vec)> Callback;
static bool ValidId(uint32_t id)
{
if (id & 0x80000000) {
if (!(id & ~0x9FFFFFFF)) {
return true; // Extended ID
}
} else {
if (!(id & ~0x7FF)) {
return true; // Standard ID
}
}
return false;
}
static bool ValidId(uint32_t id, bool extended)
{
return extended ? !(id & ~0x1FFFFFFF) : !(id & ~0x7FF);
}
static bool ValidId(const Type &msg) { return ValidId(msg->id, msg->is_extended); }
static uint32_t BuildId(uint32_t id, bool extended)
{
return extended ? ((id & 0x1FFFFFFF) | 0x80000000) : (id & 0x7FF);
}
static uint32_t BuildId(const Type &msg) { return BuildId(msg->id, msg->is_extended); }
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2)
: queue_size_(queue_size)
, callback_(callback)
, num_non_empty_deques_(0)
, pivot_(NO_PIVOT)
, max_interval_duration_(rclcpp::Duration::max())
, age_penalty_(0.1)
{
RCUTILS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
RCUTILS_ASSERT(ValidId(id1));
RCUTILS_ASSERT(ValidId(id2));
std::vector<uint32_t> ids(2);
ids[0] = id1;
ids[1] = id2;
vector_.resize(ids.size());
for (size_t i = 0; i < ids.size(); i++) {
vector_[i].id = ids[i];
vector_[i].has_dropped_messages = false;
vector_[i].inter_message_lower_bounds = rclcpp::Duration(std::chrono::nanoseconds(0));
vector_[i].warned_about_incorrect_bound = false;
}
}
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3)
: queue_size_(queue_size)
, callback_(callback)
, num_non_empty_deques_(0)
, pivot_(NO_PIVOT)
, max_interval_duration_(rclcpp::Duration::max())
, age_penalty_(0.1)
{
RCUTILS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
RCUTILS_ASSERT(ValidId(id1));
RCUTILS_ASSERT(ValidId(id2));
RCUTILS_ASSERT(ValidId(id3));
std::vector<uint32_t> ids(3);
ids[0] = id1;
ids[1] = id2;
ids[2] = id3;
vector_.resize(ids.size());
for (size_t i = 0; i < ids.size(); i++) {
vector_[i].id = ids[i];
vector_[i].has_dropped_messages = false;
vector_[i].inter_message_lower_bounds = rclcpp::Duration(std::chrono::nanoseconds(0));
vector_[i].warned_about_incorrect_bound = false;
}
}
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4)
: queue_size_(queue_size)
, callback_(callback)
, num_non_empty_deques_(0)
, pivot_(NO_PIVOT)
, max_interval_duration_(rclcpp::Duration::max())
, age_penalty_(0.1)
{
RCUTILS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
RCUTILS_ASSERT(ValidId(id1));
RCUTILS_ASSERT(ValidId(id2));
RCUTILS_ASSERT(ValidId(id3));
RCUTILS_ASSERT(ValidId(id4));
std::vector<uint32_t> ids(4);
ids[0] = id1;
ids[1] = id2;
ids[2] = id3;
ids[3] = id4;
vector_.resize(ids.size());
for (size_t i = 0; i < ids.size(); i++) {
vector_[i].id = ids[i];
vector_[i].has_dropped_messages = false;
vector_[i].inter_message_lower_bounds = rclcpp::Duration(std::chrono::nanoseconds(0));
vector_[i].warned_about_incorrect_bound = false;
}
}
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5)
: queue_size_(queue_size)
, callback_(callback)
, num_non_empty_deques_(0)
, pivot_(NO_PIVOT)
, max_interval_duration_(rclcpp::Duration::max())
, age_penalty_(0.1)
{
RCUTILS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
RCUTILS_ASSERT(ValidId(id1));
RCUTILS_ASSERT(ValidId(id2));
RCUTILS_ASSERT(ValidId(id3));
RCUTILS_ASSERT(ValidId(id4));
RCUTILS_ASSERT(ValidId(id5));
std::vector<uint32_t> ids(5);
ids[0] = id1;
ids[1] = id2;
ids[2] = id3;
ids[3] = id4;
ids[4] = id5;
vector_.resize(ids.size());
for (size_t i = 0; i < ids.size(); i++) {
vector_[i].id = ids[i];
vector_[i].has_dropped_messages = false;
vector_[i].inter_message_lower_bounds = rclcpp::Duration(std::chrono::nanoseconds(0));
vector_[i].warned_about_incorrect_bound = false;
}
}
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6)
: queue_size_(queue_size)
, callback_(callback)
, num_non_empty_deques_(0)
, pivot_(NO_PIVOT)
, max_interval_duration_(rclcpp::Duration::max())
, age_penalty_(0.1)
{
RCUTILS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
RCUTILS_ASSERT(ValidId(id1));
RCUTILS_ASSERT(ValidId(id2));
RCUTILS_ASSERT(ValidId(id3));
RCUTILS_ASSERT(ValidId(id4));
RCUTILS_ASSERT(ValidId(id5));
RCUTILS_ASSERT(ValidId(id6));
std::vector<uint32_t> ids(6);
ids[0] = id1;
ids[1] = id2;
ids[2] = id3;
ids[3] = id4;
ids[4] = id5;
ids[5] = id6;
vector_.resize(ids.size());
for (size_t i = 0; i < ids.size(); i++) {
vector_[i].id = ids[i];
vector_[i].has_dropped_messages = false;
vector_[i].inter_message_lower_bounds = rclcpp::Duration(std::chrono::nanoseconds(0));
vector_[i].warned_about_incorrect_bound = false;
}
}
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6, uint32_t id7)
: queue_size_(queue_size)
, callback_(callback)
, num_non_empty_deques_(0)
, pivot_(NO_PIVOT)
, max_interval_duration_(rclcpp::Duration::max())
, age_penalty_(0.1)
{
RCUTILS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
RCUTILS_ASSERT(ValidId(id1));
RCUTILS_ASSERT(ValidId(id2));
RCUTILS_ASSERT(ValidId(id3));
RCUTILS_ASSERT(ValidId(id4));
RCUTILS_ASSERT(ValidId(id5));
RCUTILS_ASSERT(ValidId(id6));
RCUTILS_ASSERT(ValidId(id7));
std::vector<uint32_t> ids(7);
ids[0] = id1;
ids[1] = id2;
ids[2] = id3;
ids[3] = id4;
ids[4] = id5;
ids[5] = id6;
ids[6] = id7;
vector_.resize(ids.size());
for (size_t i = 0; i < ids.size(); i++) {
vector_[i].id = ids[i];
vector_[i].has_dropped_messages = false;
vector_[i].inter_message_lower_bounds = rclcpp::Duration(std::chrono::nanoseconds(0));
vector_[i].warned_about_incorrect_bound = false;
}
}
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6, uint32_t id7, uint32_t id8)
: queue_size_(queue_size)
, callback_(callback)
, num_non_empty_deques_(0)
, pivot_(NO_PIVOT)
, max_interval_duration_(rclcpp::Duration::max())
, age_penalty_(0.1)
{
RCUTILS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
RCUTILS_ASSERT(ValidId(id1));
RCUTILS_ASSERT(ValidId(id2));
RCUTILS_ASSERT(ValidId(id3));
RCUTILS_ASSERT(ValidId(id4));
RCUTILS_ASSERT(ValidId(id5));
RCUTILS_ASSERT(ValidId(id6));
RCUTILS_ASSERT(ValidId(id7));
RCUTILS_ASSERT(ValidId(id8));
std::vector<uint32_t> ids(8);
ids[0] = id1;
ids[1] = id2;
ids[2] = id3;
ids[3] = id4;
ids[4] = id5;
ids[5] = id6;
ids[6] = id7;
ids[7] = id8;
vector_.resize(ids.size());
for (size_t i = 0; i < ids.size(); i++) {
vector_[i].id = ids[i];
vector_[i].has_dropped_messages = false;
vector_[i].inter_message_lower_bounds = rclcpp::Duration(std::chrono::nanoseconds(0));
vector_[i].warned_about_incorrect_bound = false;
}
}
ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6, uint32_t id7, uint32_t id8, uint32_t id9)
: queue_size_(queue_size)
, callback_(callback)
, num_non_empty_deques_(0)
, pivot_(NO_PIVOT)
, max_interval_duration_(rclcpp::Duration::max())
, age_penalty_(0.1)
{
RCUTILS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
RCUTILS_ASSERT(ValidId(id1));
RCUTILS_ASSERT(ValidId(id2));
RCUTILS_ASSERT(ValidId(id3));
RCUTILS_ASSERT(ValidId(id4));
RCUTILS_ASSERT(ValidId(id5));
RCUTILS_ASSERT(ValidId(id6));
RCUTILS_ASSERT(ValidId(id7));
RCUTILS_ASSERT(ValidId(id8));
RCUTILS_ASSERT(ValidId(id9));
std::vector<uint32_t> ids(9);
ids[0] = id1;
ids[1] = id2;
ids[2] = id3;
ids[3] = id4;
ids[4] = id5;
ids[5] = id6;
ids[6] = id7;
ids[7] = id8;
ids[8] = id9;
vector_.resize(ids.size());
for (size_t i = 0; i < ids.size(); i++) {
vector_[i].id = ids[i];
vector_[i].has_dropped_messages = false;
vector_[i].inter_message_lower_bounds = rclcpp::Duration(std::chrono::nanoseconds(0));
vector_[i].warned_about_incorrect_bound = false;
}
}
~ApproximateTime() {}
void processMsg(const Type &msg)
{
if (msg->is_rtr || msg->is_error) return;
RCUTILS_LOG_WARN_EXPRESSION(!ValidId(msg), "Processed CAN message with invalid id: 0x%X (%s)", msg->id, msg->is_extended ? "extended" : "standard");
for (size_t i = 0; i < vector_.size(); i++) {
if (BuildId(msg) == vector_[i].id) {
#if 0
ROS_INFO("Id 0x%X: %u.%u", BuildId(msg), msg->header.stamp.sec, msg->header.stamp.nsec);
#endif
std::deque<Type>& deque = vector_[i].deque;
deque.push_back(msg);
if (deque.size() == (size_t)1) {
// We have just added the first message, so it was empty before
++num_non_empty_deques_;
if (num_non_empty_deques_ == (uint32_t)vector_.size()) {
// All deques have messages
process();
}
} else {
checkInterMessageBound(i);
}
// Check whether we have more messages than allowed in the queue.
// Note that during the above call to process(), queue i may contain queue_size_+1 messages.
std::vector<Type>& past = vector_[i].past;
if (deque.size() + past.size() > queue_size_) {
// Cancel ongoing candidate search, if any:
num_non_empty_deques_ = 0; // We will recompute it from scratch
{for (size_t i = 0; i < vector_.size(); i++) {
recover(i);
}}
// Drop the oldest message in the offending topic
RCUTILS_ASSERT(!deque.empty());
deque.pop_front();
vector_[i].has_dropped_messages = true;
if (pivot_ != NO_PIVOT) {
// The candidate is no longer valid. Destroy it.
for (size_t i = 0; i < vector_.size(); i++) {
vector_[i].candidate.reset();
}
pivot_ = NO_PIVOT;
// There might still be enough messages to create a new candidate:
process();
}
}
return;
}
}
}
/*
* Set the Age penalty: when comparing the size of sets, later intervals are penalized by a factor (1+AgePenalty).
* The default is 0. A non zero penalty can help output sets earlier, or output more sets, at some cost in quality.
*/
void setAgePenalty(double age_penalty) {
// For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
RCUTILS_ASSERT(age_penalty >= 0);
age_penalty_ = age_penalty;
}
/*
* Set the Inter message lower bound: if messages of a particular topic cannot be closer together than a known interval,
* providing this lower bound will not change the output but will allow the algorithm to conclude earlier that a given
* set is optimal, reducing delays. With the default value of 0, for messages spaced on average by a duration T,
* the algorithm can introduce a delay of about T. With good bounds provided a set can often be published as soon as
* the last message of the set is received. An incorrect bound will result in suboptimal sets being selected. A typical
* bound is, say, 1/2 the frame rate of a camera.
*/
void setInterMessageLowerBound(rclcpp::Duration lower_bound) {
RCUTILS_ASSERT(lower_bound >= rclcpp::Duration(0,0));
for (size_t i = 0; i < vector_.size(); i++) {
vector_[i].inter_message_lower_bounds = lower_bound;
}
}
/*
* Set the Inter message lower bound for each individual message index
*/
void setInterMessageLowerBound(size_t i, rclcpp::Duration lower_bound) {
// For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
RCUTILS_ASSERT(lower_bound >= rclcpp::Duration(0,0));
RCUTILS_ASSERT(i < vector_.size());
vector_[i].inter_message_lower_bounds = lower_bound;
}
/*
* Set the Max interval duration: sets of more than this size will not be considered (disabled by default). The effect
* is similar to throwing away a posteriori output sets that are too large, but it can be a little better.
*/
void setMaxIntervalDuration(rclcpp::Duration max_interval_duration) {
// For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
RCUTILS_ASSERT(max_interval_duration >= rclcpp::Duration(0,0));
max_interval_duration_ = max_interval_duration;
}
private:
void checkInterMessageBound(size_t i)
{
if (vector_[i].warned_about_incorrect_bound) {
return;
}
std::deque<Type>& deque = vector_[i].deque;
std::vector<Type>& v = vector_[i].past;
RCUTILS_ASSERT(!deque.empty());
rclcpp::Time msg_time = deque.back()->header.stamp;
rclcpp::Time previous_msg_time;
if (deque.size() == (size_t)1) {
if (v.empty()) {
// We have already published (or have never received) the previous message, we cannot check the bound
return;
}
previous_msg_time = v.back()->header.stamp;
} else {
// There are at least 2 elements in the deque. Check that the gap respects the bound if it was provided.
previous_msg_time = deque[deque.size()-2]->header.stamp;
}
if (msg_time < previous_msg_time) {
//RDM don't use this in normal code
RCUTILS_LOG_WARN_ONCE("Messages of type %" PRIu64 " arrived out of order (will print only once)", i);
vector_[i].warned_about_incorrect_bound = true;
} else if ((msg_time - previous_msg_time) < vector_[i].inter_message_lower_bounds) {
RCUTILS_LOG_WARN_ONCE("Messages of type %" PRIu64 " arrived closer ("
"%" PRId64 ") than the lower bound you provided ("
"%" PRId64 ") (will print only once)",
i,
(msg_time - previous_msg_time).nanoseconds(),
vector_[i].inter_message_lower_bounds.nanoseconds());
vector_[i].warned_about_incorrect_bound = true;
}
}
// Assumes that deque number <index> is non empty
void dequeDeleteFront(size_t i)
{
std::deque<Type>& deque = vector_[i].deque;
RCUTILS_ASSERT(!deque.empty());
deque.pop_front();
if (deque.empty()) {
--num_non_empty_deques_;
}
}
// Assumes that deque number <index> is non empty
void dequeMoveFrontToPast(size_t i)
{
std::deque<Type>& deque = vector_[i].deque;
std::vector<Type>& vector = vector_[i].past;
RCUTILS_ASSERT(!deque.empty());
vector.push_back(deque.front());
deque.pop_front();
if (deque.empty()) {
--num_non_empty_deques_;
}
}
void makeCandidate()
{
//printf("Creating candidate\n");
for (size_t i = 0; i < vector_.size(); i++) {
vector_[i].candidate = vector_[i].deque.front(); // Create candidate tuple
vector_[i].past.clear(); // Delete all past messages, since we have found a better candidate
}
//printf("Candidate created\n");
}
void recover(size_t i, size_t num_messages)
{
std::vector<Type>& v = vector_[i].past;
std::deque<Type>& q = vector_[i].deque;
RCUTILS_ASSERT(num_messages <= v.size());
while (num_messages > 0) {
q.push_front(v.back());
v.pop_back();
num_messages--;
}
if (!q.empty()) {
++num_non_empty_deques_;
}
}
void recover(size_t i)
{
std::vector<Type>& v = vector_[i].past;
std::deque<Type>& q = vector_[i].deque;
while (!v.empty()) {
q.push_front(v.back());
v.pop_back();
}
if (!q.empty()) {
++num_non_empty_deques_;
}
}
void recoverAndDelete(size_t i)
{
std::vector<Type>& v = vector_[i].past;
std::deque<Type>& q = vector_[i].deque;
while (!v.empty()) {
q.push_front(v.back());
v.pop_back();
}
RCUTILS_ASSERT(!q.empty());
q.pop_front();
if (!q.empty()) {
++num_non_empty_deques_;
}
}
// Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
void publishCandidate()
{
//printf("Publishing candidate\n");
// Publish
std::vector<Type> candidate;
for (size_t i = 0; i < vector_.size(); i++) {
candidate.push_back(vector_[i].candidate);
}
callback_(candidate);
// Delete this candidate
for (size_t i = 0; i < vector_.size(); i++) {
vector_[i].candidate.reset();
}
pivot_ = NO_PIVOT;
// Recover hidden messages, and delete the ones corresponding to the candidate
num_non_empty_deques_ = 0; // We will recompute it from scratch
for (size_t i = 0; i < vector_.size(); i++) {
recoverAndDelete(i);
}
}
// Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
// Returns: the oldest message on the deques
void getCandidateStart(uint32_t &start_index, rclcpp::Time &start_time)
{
return getCandidateBoundary(start_index, start_time, false);
}
// Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
// Returns: the latest message among the heads of the deques, i.e. the minimum
// time to end an interval started at getCandidateStart_index()
void getCandidateEnd(uint32_t &end_index, rclcpp::Time &end_time)
{
return getCandidateBoundary(end_index, end_time, true);
}
// ASSUMES: all deques are non-empty
// end = true: look for the latest head of deque
// false: look for the earliest head of deque
void getCandidateBoundary(uint32_t &index, rclcpp::Time &time, bool end)
{
time = vector_[0].deque.front()->header.stamp;
index = 0;
for (size_t i = 1; i < vector_.size(); i++) {
const rclcpp::Time &t = vector_[i].deque.front()->header.stamp;
if ((t < time) ^ end) {
time = t;
index = i;
}
}
}
// ASSUMES: we have a pivot and candidate
rclcpp::Time getVirtualTime(size_t i)
{
if (i >= vector_.size()) {
return rclcpp::Time(0,0); // Dummy return value
}
RCUTILS_ASSERT(pivot_ != NO_PIVOT);
std::vector<Type>& v = vector_[i].past;
std::deque<Type>& q = vector_[i].deque;
if (q.empty()) {
RCUTILS_ASSERT(!v.empty()); // Because we have a candidate
rclcpp::Time last_msg_time = v.back()->header.stamp;
rclcpp::Time msg_time_lower_bound = last_msg_time + vector_[i].inter_message_lower_bounds;
if (msg_time_lower_bound > pivot_time_) { // Take the max
return msg_time_lower_bound;
}
return pivot_time_;
}
rclcpp::Time current_msg_time = q.front()->header.stamp;
return current_msg_time;
}
// ASSUMES: we have a pivot and candidate
void getVirtualCandidateStart(uint32_t &start_index, rclcpp::Time &start_time)
{
return getVirtualCandidateBoundary(start_index, start_time, false);
}
// ASSUMES: we have a pivot and candidate
void getVirtualCandidateEnd(uint32_t &end_index, rclcpp::Time &end_time)
{
return getVirtualCandidateBoundary(end_index, end_time, true);
}
// ASSUMES: we have a pivot and candidate
// end = true: look for the latest head of deque
// false: look for the earliest head of deque
void getVirtualCandidateBoundary(uint32_t &index, rclcpp::Time &time, bool end)
{
std::vector<rclcpp::Time> virtual_times(vector_.size());
for (size_t i = 0; i < vector_.size(); i++) {
virtual_times[i] = getVirtualTime(i);
}
time = virtual_times[0];
index = 0;
for (size_t i = 0; i < vector_.size(); i++) {
if ((virtual_times[i] < time) ^ end) {
time = virtual_times[i];
index = i;
}
}
}
// assumes data_mutex_ is already locked
void process()
{
// While no deque is empty
while (num_non_empty_deques_ == (uint32_t)vector_.size()) {
// Find the start and end of the current interval
//printf("Entering while loop in this state [\n");
//show_internal_state();
//printf("]\n");
rclcpp::Time end_time, start_time;
uint32_t end_index, start_index;
getCandidateEnd(end_index, end_time);
getCandidateStart(start_index, start_time);
for (uint32_t i = 0; i < (uint32_t)vector_.size(); i++) {
if (i != end_index) {
// No dropped message could have been better to use than the ones we have,
// so it becomes ok to use this topic as pivot in the future
vector_[i].has_dropped_messages = false;
}
}
if (pivot_ == NO_PIVOT) {
// We do not have a candidate
// INVARIANT: the past_ vectors are empty
// INVARIANT: (candidate_ has no filled members)
if (end_time - start_time > max_interval_duration_) {
// This interval is too big to be a valid candidate, move to the next
dequeDeleteFront(start_index);
continue;
}
if (vector_[end_index].has_dropped_messages) {
// The topic that would become pivot has dropped messages, so it is not a good pivot
dequeDeleteFront(start_index);
continue;
}
// This is a valid candidate, and we don't have any, so take it
makeCandidate();
candidate_start_ = start_time;
candidate_end_ = end_time;
pivot_ = end_index;
pivot_time_ = end_time;
dequeMoveFrontToPast(start_index);
} else {
// We already have a candidate
// Is this one better than the current candidate?
// INVARIANT: has_dropped_messages_ is all false
if ((end_time - candidate_end_) * (1 + age_penalty_) >= (start_time - candidate_start_)) {
// This is not a better candidate, move to the next
dequeMoveFrontToPast(start_index);
} else {
// This is a better candidate
makeCandidate();
candidate_start_ = start_time;
candidate_end_ = end_time;
dequeMoveFrontToPast(start_index);
// Keep the same pivot (and pivot time)
}
}
// INVARIANT: we have a candidate and pivot
RCUTILS_ASSERT(pivot_ != NO_PIVOT);
//printf("start_index == %d, pivot_ == %d\n", start_index, pivot_);
if (start_index == pivot_) { // TODO: replace with start_time == pivot_time_
// We have exhausted all possible candidates for this pivot, we now can output the best one
publishCandidate();
} else if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_)) {
// We have not exhausted all candidates, but this candidate is already provably optimal
// Indeed, any future candidate must contain the interval [pivot_time_ end_time], which
// is already too big.
// Note: this case is subsumed by the next, but it may save some unnecessary work and
// it makes things (a little) easier to understand
publishCandidate();
} else if (num_non_empty_deques_ < (uint32_t)vector_.size()) {
uint32_t num_non_empty_deques_before_virtual_search = num_non_empty_deques_;
// Before giving up, use the rate bounds, if provided, to further try to prove optimality
std::vector<int> num_virtual_moves(9,0);
while (1) {
rclcpp::Time end_time, start_time;
uint32_t end_index, start_index;
getVirtualCandidateEnd(end_index, end_time);
getVirtualCandidateStart(start_index, start_time);
if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_)) {
// We have proved optimality
// As above, any future candidate must contain the interval [pivot_time_ end_time], which
// is already too big.
publishCandidate(); // This cleans up the virtual moves as a byproduct
break; // From the while(1) loop only
}
if ((end_time - candidate_end_) * (1 + age_penalty_) < (start_time - candidate_start_)) {
// We cannot prove optimality
// Indeed, we have a virtual (i.e. optimistic) candidate that is better than the current
// candidate
// Cleanup the virtual search:
num_non_empty_deques_ = 0; // We will recompute it from scratch
for (size_t i = 0; i < vector_.size(); i++) {
recover(i, num_virtual_moves[i]);
}
(void)num_non_empty_deques_before_virtual_search; // unused variable warning stopper
RCUTILS_ASSERT(num_non_empty_deques_before_virtual_search == num_non_empty_deques_);
break;
}
// Note: we cannot reach this point with start_index == pivot_ since in that case we would
// have start_time == pivot_time, in which case the two tests above are the negation
// of each other, so that one must be true. Therefore the while loop always terminates.
RCUTILS_ASSERT(start_index != pivot_);
RCUTILS_ASSERT(start_time < pivot_time_);
dequeMoveFrontToPast(start_index);
num_virtual_moves[start_index]++;
} // while(1)
}
} // while(num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
}
uint32_t queue_size_;
Callback callback_;
static const uint32_t NO_PIVOT = 9; // Special value for the pivot indicating that no pivot has been selected
struct VectorData {
uint32_t id;
std::deque<Type> deque;
std::vector<Type> past;
Type candidate; // NULL if there is no candidate, in which case there is no pivot.
bool has_dropped_messages;
rclcpp::Duration inter_message_lower_bounds;
bool warned_about_incorrect_bound;
VectorData() : inter_message_lower_bounds(rclcpp::Duration::max()) {}
};
std::vector<VectorData> vector_;
uint32_t num_non_empty_deques_;
rclcpp::Time candidate_start_;
rclcpp::Time candidate_end_;
rclcpp::Time pivot_time_;
uint32_t pivot_; // Equal to NO_PIVOT if there is no candidate
rclcpp::Duration max_interval_duration_; // TODO: initialize with a parameter
double age_penalty_;
};
} // namespace dataspeed_can_msg_filters