Program Listing for File approximate_sync.hpp
↰ Return to documentation for file (include/flex_sync/approximate_sync.hpp
)
// -*-c++-*---------------------------------------------------------------------------------------
// Copyright 2024 Bernd Pfrommer <bernd.pfrommer@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef FLEX_SYNC__APPROXIMATE_SYNC_HPP_
#define FLEX_SYNC__APPROXIMATE_SYNC_HPP_
#include <cassert>
#include <deque>
#include <flex_sync/msg_pack.hpp>
#include <functional>
#include <iostream>
#include <map>
#include <mutex>
#include <rclcpp/rclcpp.hpp>
#include <string>
#include <tuple>
#include <utility>
#include <vector>
/*
* Class for approximate synchronizing across variable number of messages
*/
namespace flex_sync
{
template <typename... MsgTypes>
class ApproximateSync
{
public:
using message_types = MsgPack<MsgTypes...>;
using Callback = std::function<void(
const std::vector<std::shared_ptr<const MsgTypes>> &...)>;
ApproximateSync(
const std::vector<std::vector<std::string>> & topics, Callback cb,
size_t queue_size)
: topics_(topics), cb_(cb), queue_size_(queue_size)
{
const size_t ntypes = sizeof...(MsgTypes);
if (ntypes != topics.size()) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("ApproximateSync"),
"number of topic vectors: "
<< topics.size()
<< " does not match number of message types: " << ntypes);
throw(std::runtime_error("num topic vectors != num msg types"));
}
TopicInfoInitializer tii;
(void)for_each(type_infos_, &tii);
}
size_t getNumberDropped() const { return (num_dropped_); }
void clearNumberDropped() { num_dropped_ = 0; }
const std::vector<std::vector<std::string>> & getTopics() const
{
return (topics_);
}
size_t getQueueSize() const { return (queue_size_); }
template <typename MsgPtrT>
void process(const std::string & topic, const MsgPtrT & msg)
{
typedef TypeInfo<typename MsgPtrT::element_type const> TypeInfoT;
typedef TopicInfo<typename MsgPtrT::element_type const> TopicInfoT;
// find correct topic info array via lookup by type
TypeInfoT & ti = std::get<TypeInfoT>(type_infos_);
auto topic_it = ti.topic_to_index.find(topic);
if (topic_it == ti.topic_to_index.end()) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("approx_sync"),
"flex_sync: invalid topic " << topic << " for message type");
throw std::runtime_error("invalid topic: " + topic);
}
TopicInfoT & topic_info = ti.topic_info[topic_it->second];
// add new message to the deque for this topic
topic_info.deque.push_back(msg);
if (topic_info.deque.size() == 1ul) {
++num_non_empty_deques_;
if (num_non_empty_deques_ == tot_num_deques_) {
update(); // all deques have messages, go for it
}
} else {
checkInterMessageBound<TopicInfoT>(&topic_info, topic);
}
// check for queue overflow and handle if necesary
if (topic_info.deque.size() + topic_info.past.size() > queue_size_) {
// Cancel ongoing candidate search, if any:
num_non_empty_deques_ = 0; // We will recompute it from scratch
Recover rcv;
(void)for_each(type_infos_, &rcv);
// Drop the oldest message in the offending topic
assert(!topic_info.deque.empty());
topic_info.deque.pop_front();
num_dropped_++;
topic_info.has_dropped_messages = true;
if (pivot_.isValid()) {
// The candidate is no longer valid. Destroy it.
pivot_ = FullIndex(); // reset to invalid
// There might still be enough messages to create a new candidate:
update();
}
}
}
private:
typedef std::tuple<std::vector<std::shared_ptr<const MsgTypes>>...>
CallbackArg;
// define some simple collections
template <typename MsgType>
using TopicDeque = std::deque<std::shared_ptr<MsgType>>;
template <typename MsgType>
using TopicVec = std::vector<std::shared_ptr<MsgType>>;
// TopicInfo maintains per-topic data: a deque, past messages
// etc
template <typename MsgType>
struct TopicInfo
{
TopicDeque<MsgType> deque;
TopicVec<MsgType> past;
bool has_dropped_messages{false};
rclcpp::Duration inter_message_lower_bound{rclcpp::Duration(0, 0)};
uint32_t num_virtual_moves{0};
bool warned_about_incorrect_bound{false};
};
// TypeInfo holds all data for a particular message type
template <typename MsgType>
struct TypeInfo
{
std::vector<TopicInfo<MsgType>> topic_info;
std::map<std::string, int> topic_to_index;
};
typedef std::tuple<TypeInfo<const MsgTypes>...> TupleOfTypeInfo;
// FullIndex has two components: which type, and which topic within
// that type
struct FullIndex
{
explicit FullIndex(int32_t tp = -1, int32_t tc = -1) : type(tp), topic(tc)
{
}
bool operator==(const FullIndex & a) const
{
return (type == a.type && topic == a.topic);
}
bool operator!=(const FullIndex & a) const { return (!(a == *this)); }
bool isValid() const { return (type != -1 && topic != -1); }
int32_t type;
int32_t topic;
};
struct TopicInfoInitializer
{
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync) const
{
const size_t num_topics = sync->topics_[I].size();
std::get<I>(sync->candidate_).resize(num_topics);
auto & type_info = std::get<I>(sync->type_infos_);
type_info.topic_info.resize(num_topics);
sync->tot_num_deques_ += num_topics;
// make map between topic string and index for
// lookup when data arrives
for (int t_idx = 0; t_idx < static_cast<int>(sync->topics_[I].size());
t_idx++) {
type_info.topic_to_index[sync->topics_[I][t_idx]] = t_idx;
}
return (num_topics);
}
};
struct CandidateMaker
{
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync) const
{
int num_cb_vals_found = 0;
auto & type_info = std::get<I>(sync->type_infos_);
for (size_t i = 0; i < type_info.topic_info.size(); i++) {
auto & ti = type_info.topic_info[i];
const auto & deque = ti.deque;
std::get<I>(sync->candidate_)[i] = deque.front();
// Delete all past messages, since we have found a better candidate
ti.past.clear();
num_cb_vals_found++;
}
return (num_cb_vals_found);
}
};
void makeCandidate()
{
CandidateMaker cm;
(void)for_each(type_infos_, &cm);
}
class DroppedMessageUpdater
{
public:
explicit DroppedMessageUpdater(const FullIndex & end)
: end_index_(end), has_dropped_messages_(false)
{
}
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync)
{
auto & type_info = std::get<I>(sync->type_infos_);
for (size_t j = 0; j < type_info.topic_info.size(); j++) {
auto & topic_info = type_info.topic_info[j];
if (!((static_cast<int>(I) == end_index_.type) &&
(static_cast<int>(j) == end_index_.topic))) {
// No dropped message could have been better to use than
// the ones we have, so it becomes ok to use this topic
// as pivot in the future
topic_info.has_dropped_messages = false;
} else {
// capture whether the end_index has dropped messages
has_dropped_messages_ = topic_info.has_dropped_messages;
}
}
return (0);
}
bool endIndexHasDroppedMessages() const { return (has_dropped_messages_); }
private:
FullIndex end_index_;
bool has_dropped_messages_{false};
};
class CandidateBoundaryFinder
{
public:
CandidateBoundaryFinder()
: start_time_(rclcpp::Time::max(), RCL_ROS_TIME),
end_time_(rclcpp::Time(0, 0), RCL_ROS_TIME)
{
}
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync)
{
int num_deques_found = 0;
const auto & type_info = std::get<I>(sync->type_infos_);
for (size_t topicIdx = 0; topicIdx < type_info.topic_info.size();
topicIdx++) {
const auto & ti = type_info.topic_info[topicIdx];
const auto & deque = ti.deque;
if (deque.empty()) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("approx_sync"),
"ERROR: deque " << I << "," << topicIdx << " cannot be empty!");
throw std::runtime_error("empty deque found!");
}
const auto & m = deque.front();
const auto t = rclcpp::Time(m->header.stamp);
if (t < start_time_) {
start_time_ = t;
start_index_ = FullIndex(I, topicIdx);
}
if (!end_time_valid_ || t > end_time_) {
end_time_ = t;
end_time_valid_ = true;
end_index_ = FullIndex(I, topicIdx);
}
num_deques_found++;
}
return (num_deques_found);
}
rclcpp::Time getStartTime() const { return (start_time_); }
rclcpp::Time getEndTime() const { return (end_time_); }
FullIndex getStartIndex() const { return (start_index_); }
FullIndex getEndIndex() const { return (end_index_); }
private:
FullIndex start_index_;
rclcpp::Time start_time_;
FullIndex end_index_;
rclcpp::Time end_time_;
bool end_time_valid_{false};
};
// Assumes: all deques are non empty
// Returns in arguments:
// - the oldest message on the deques + time
// - the newest message on the deques + time
void getCandidateBoundary(
FullIndex * start_index, rclcpp::Time * start_time, FullIndex * end_index,
rclcpp::Time * end_time)
{
CandidateBoundaryFinder cbf;
(void)for_each(type_infos_, &cbf);
*start_index = cbf.getStartIndex();
*start_time = cbf.getStartTime();
*end_index = cbf.getEndIndex();
*end_time = cbf.getEndTime();
}
class VirtualCandidateBoundaryFinder
{
public:
VirtualCandidateBoundaryFinder()
: start_time_(rclcpp::Time::max(), RCL_ROS_TIME),
end_time_(rclcpp::Time(0, 0), RCL_ROS_TIME)
{
}
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync)
{
int num_deques_found = 0;
const auto & type_info = std::get<I>(sync->type_infos_);
assert(sync->pivot_.isValid());
for (size_t topicIdx = 0; topicIdx < type_info.topic_info.size();
topicIdx++) {
const auto & ti = type_info.topic_info[topicIdx];
const auto & deque = ti.deque;
const auto & past = ti.past;
// get virtual time
rclcpp::Time virtual_time;
if (deque.empty()) {
assert(!past.empty()); // Because we have a candidate
const rclcpp::Time last_msg_time(past.back()->header.stamp);
const rclcpp::Time msg_time_lower_bound =
last_msg_time + ti.inter_message_lower_bound;
virtual_time = std::max(msg_time_lower_bound, sync->pivot_time_);
} else {
virtual_time = rclcpp::Time(deque.front()->header.stamp);
}
if (virtual_time < start_time_) {
start_time_ = virtual_time;
start_index_ = FullIndex(I, topicIdx);
}
if (virtual_time > end_time_) {
end_time_ = virtual_time;
end_index_ = FullIndex(I, topicIdx);
}
num_deques_found++;
}
return (num_deques_found);
}
rclcpp::Time getStartTime() const { return (start_time_); }
rclcpp::Time getEndTime() const { return (end_time_); }
FullIndex getStartIndex() const { return (start_index_); }
FullIndex getEndIndex() const { return (end_index_); }
private:
FullIndex start_index_;
rclcpp::Time start_time_;
FullIndex end_index_;
rclcpp::Time end_time_;
};
void getVirtualCandidateBoundary(
FullIndex * start_index, rclcpp::Time * start_time, FullIndex * end_index,
rclcpp::Time * end_time)
{
VirtualCandidateBoundaryFinder vcbf;
(void)for_each(type_infos_, &vcbf);
*start_index = vcbf.getStartIndex();
*start_time = vcbf.getStartTime();
*end_index = vcbf.getEndIndex();
*end_time = vcbf.getEndTime();
}
class DequeFrontDeleter
{
public:
explicit DequeFrontDeleter(const FullIndex & index) : index_(index) {}
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync)
{
auto & type_info = std::get<I>(sync->type_infos_);
if (I == index_.type) {
auto & deque = type_info.topic_info[index_.topic].deque;
assert(!deque.empty());
deque.pop_front();
if (deque.empty()) {
--sync->num_non_empty_deques_;
}
}
return (0);
}
private:
FullIndex index_;
};
void dequeDeleteFront(const FullIndex & index)
{
DequeFrontDeleter dfd(index);
(void)for_each(type_infos_, &dfd);
}
class DequeMoverFrontToPast
{
public:
DequeMoverFrontToPast(const FullIndex & index, bool updateVirtualMoves)
: index_(index), update_virtual_moves_(updateVirtualMoves)
{
}
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync)
{
auto & type_info = std::get<I>(sync->type_infos_);
if (I == index_.type) {
auto & ti = type_info.topic_info[index_.topic];
auto & deque = ti.deque;
auto & past = ti.past;
assert(!deque.empty());
past.push_back(deque.front());
deque.pop_front();
if (deque.empty()) {
--(sync->num_non_empty_deques_);
}
if (update_virtual_moves_) {
ti.num_virtual_moves++;
}
}
return (0);
}
private:
FullIndex index_;
bool update_virtual_moves_{false};
};
// Assumes that deque number <index> is non empty
void dequeMoveFrontToPast(const FullIndex & index)
{
DequeMoverFrontToPast dmfp(index, false);
(void)for_each(type_infos_, &dmfp);
}
class RecoverAndDelete
{
public:
RecoverAndDelete() {}
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync)
{
auto & type_info = std::get<I>(sync->type_infos_);
for (auto & ti : type_info.topic_info) {
auto & deque = ti.deque;
auto & past = ti.past;
while (!past.empty()) {
deque.push_front(past.back());
past.pop_back();
}
assert(!deque.empty());
deque.pop_front();
if (!deque.empty()) {
++(sync->num_non_empty_deques_);
}
}
return (0);
}
};
void recoverAndDelete()
{
RecoverAndDelete rnd;
(void)for_each(type_infos_, &rnd);
}
class ResetNumVirtualMoves
{
public:
ResetNumVirtualMoves() {}
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync)
{
auto & type_info = std::get<I>(sync->type_infos_);
for (auto & ti : type_info.topic_info) {
ti.num_virtual_moves = 0;
}
return (0);
}
};
class RecoverWithVirtualMoves
{
public:
RecoverWithVirtualMoves() {}
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync)
{
auto & type_info = std::get<I>(sync->type_infos_);
for (auto & ti : type_info.topic_info) {
for (uint32_t n = ti.num_virtual_moves; n != 0; n--) {
ti.deque.push_front(ti.past.back());
ti.past.pop_back();
}
if (!ti.deque.empty()) {
sync->num_non_empty_deques_++;
}
}
return (0);
}
};
class Recover
{
public:
Recover() {}
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync)
{
auto & type_info = std::get<I>(sync->type_infos_);
for (auto & ti : type_info.topic_info) {
while (!ti.past.empty()) {
ti.deque.push_front(ti.past.back());
ti.past.pop_back();
}
if (!ti.deque.empty()) {
sync->num_non_empty_deques_++;
}
}
return (0);
}
};
// Assumes: all deques are non empty now
void publishCandidate()
{
// std::cout << "publishing candidate" << std::endl;
// printCandidate();
std::apply([this](auto &&... args) { cb_(args...); }, candidate_);
// candidate_ = Tuple(); no needed
pivot_ = FullIndex(); // reset to invalid
// Recover hidden messages, and delete the ones
// corresponding to the candidate
num_non_empty_deques_ = 0; // We will recompute it from scratch
recoverAndDelete();
}
void update()
{
// While no deque is empty
while (num_non_empty_deques_ == tot_num_deques_) {
// Find the start and end of the current interval
FullIndex end_index, start_index;
rclcpp::Time start_time, end_time;
getCandidateBoundary(&start_index, &start_time, &end_index, &end_time);
DroppedMessageUpdater dmu(end_index);
(void)for_each(type_infos_, &dmu); // update dropped messages
if (!pivot_.isValid()) {
// We do not have a candidate
// INVARIANT: the past_ vectors are empty
// INVARIANT: (candidate_ has no filled members)
if (end_time - start_time > max_interval_duration_) {
// This interval is too big to be a valid candidate,
// move to the next
dequeDeleteFront(start_index);
continue;
}
if (dmu.endIndexHasDroppedMessages()) {
// The topic that would become pivot has dropped messages,
// so it is not a good pivot
dequeDeleteFront(start_index);
continue;
}
// This is a valid candidate, and we don't have any, so take it
makeCandidate();
candidate_start_ = start_time;
candidate_end_ = end_time;
pivot_ = end_index;
pivot_time_ = end_time;
dequeMoveFrontToPast(start_index);
} else {
// We already have a candidate
// Is this one better than the current candidate?
// INVARIANT: has_dropped_messages is all false
if (
(end_time - candidate_end_) * (1 + age_penalty_) >=
(start_time - candidate_start_)) {
// This is not a better candidate, move to the next
dequeMoveFrontToPast(start_index);
} else {
// This is a better candidate
makeCandidate();
candidate_start_ = start_time;
candidate_end_ = end_time;
dequeMoveFrontToPast(start_index);
}
}
// INVARIANT: we have a candidate and pivot
assert(pivot_.isValid());
if (start_index == pivot_) {
// TODO(Bernd): replace with start_time == pivot_time_
// We have exhausted all possible candidates for this pivot,
// we now can output the best one
publishCandidate();
} else if (
(end_time - candidate_end_) * (1 + age_penalty_) >=
(pivot_time_ - candidate_start_)) {
// We have not exhausted all candidates,
// but this candidate is already provably optimal
// Indeed, any future candidate must contain the interval
// [pivot_time_ end_time], which is already too big.
// Note: this case is subsumed by the next, but it may
// save some unnecessary work and
// it makes things (a little) easier to understand
publishCandidate();
} else if (num_non_empty_deques_ < tot_num_deques_) {
uint32_t num_non_empty_deques_before_virtual_search =
num_non_empty_deques_;
ResetNumVirtualMoves rnvm;
(void)for_each(type_infos_, &rnvm);
while (1) {
// printNVM();
rclcpp::Time end_time, start_time;
FullIndex end_index, start_index;
getVirtualCandidateBoundary(
&start_index, &start_time, &end_index, &end_time);
if (
(end_time - candidate_end_) * (1 + age_penalty_) >=
(pivot_time_ - candidate_start_)) {
// We have proved optimality
// As above, any future candidate must contain the interval
// [pivot_time_ end_time], which is already too big.
publishCandidate(); // This cleans virtual moves as a byproduct
break; // From the while(1) loop only
}
if (
(end_time - candidate_end_) * (1 + age_penalty_) <
(start_time - candidate_start_)) {
// We cannot prove optimality
// Indeed, we have a virtual (i.e. optimistic) candidate
// that is better than the current candidate
// Cleanup the virtual search:
num_non_empty_deques_ = 0; // We will recompute it from scratch
RecoverWithVirtualMoves rvvm;
(void)for_each(type_infos_, &rvvm);
// unused variable warning stopper
(void)num_non_empty_deques_before_virtual_search;
assert(
static_cast<int>(num_non_empty_deques_before_virtual_search) ==
num_non_empty_deques_);
break;
}
// Note: we cannot reach this point with
// start_index == pivot_ since in that case we would have
// start_time == pivot_time, in which case the two tests
// above are the negation of each other, so that one must be true.
// Therefore the while loop always terminates.
assert(start_index != pivot_);
assert(start_time < pivot_time_);
// move front to past and update num_virtual_moves
DequeMoverFrontToPast dmfp(start_index, true);
(void)for_each(type_infos_, &dmfp);
} // while(1)
}
} // while(num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
}
template <typename TopicInfoT>
void checkInterMessageBound(TopicInfoT * tinfo, const std::string & topic)
{
auto & topic_info = *tinfo;
if (topic_info.warned_about_incorrect_bound) {
return;
}
const auto & deque = topic_info.deque;
const auto & v = topic_info.past;
assert(!deque.empty());
const rclcpp::Time msg_time(deque.back()->header.stamp);
rclcpp::Time previous_msg_time;
if (deque.size() == static_cast<size_t>(1)) {
if (v.empty()) {
// We have already published (or have never received)
// the previous message, we cannot check the bound
return;
}
previous_msg_time = rclcpp::Time(v.back()->header.stamp);
} else {
// There are at least 2 elements in the deque. Check that the gap
// respects the bound if it was provided.
previous_msg_time = deque[deque.size() - 2]->header.stamp;
}
// now we have msg_time and previous_msg_time, do the check
if (msg_time < previous_msg_time) {
RCLCPP_WARN_STREAM(
rclcpp::get_logger("approx_sync"),
"Messages for " << topic << " arrived out of order (print only once)");
topic_info.warned_about_incorrect_bound = true;
} else if (
(msg_time - previous_msg_time) < topic_info.inter_message_lower_bound) {
const rclcpp::Duration diff = msg_time - previous_msg_time;
RCLCPP_WARN_STREAM(
rclcpp::get_logger("approx_sync"),
"Messages for " << topic << " arrived closer (" << diff.nanoseconds()
<< ") than the lower bound you provided ("
<< topic_info.inter_message_lower_bound.nanoseconds()
<< ") (will print only once)");
topic_info.warned_about_incorrect_bound = true;
}
}
#ifdef DEBUG_PRINTING
class StatePrinter
{
public:
StatePrinter() {}
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync)
{
int num_topics = 0;
auto & type_info = std::get<I>(sync->type_infos_);
for (auto & ti : type_info.topic_info) {
auto & deque = ti.deque;
auto & past = ti.past;
rclcpp::Time dt, pt;
if (!deque.empty()) {
dt = rclcpp::Time(deque.back()->header.stamp);
}
if (!past.empty()) {
pt = rclcpp::Time(past.back()->header.stamp);
}
const int n = I * 3 + num_topics; // XXX only right for 3 topics/type
std::cout << n << " deque: " << deque.size() << " " << dt << std::endl;
std::cout << n << " past: " << past.size() << " " << pt << std::endl;
num_topics++;
}
return (num_topics);
;
}
};
void printState()
{
StatePrinter sp;
(void)for_each(type_infos_, &sp);
}
class NVMPrinter
{
public:
NVMPrinter() {}
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync)
{
int num_topics = 0;
auto & type_info = std::get<I>(sync->type_infos_);
for (auto & ti : type_info.topic_info) {
const int n = I * 3 + num_topics; // XXX only right for 3 topics/type
std::cout << n << " nvm: " << ti.num_virtual_moves << " "
<< " wb: " << ti.warned_about_incorrect_bound << std::endl;
num_topics++;
}
return (num_topics);
;
}
};
void printNVM()
{
NVMPrinter nvmp;
(void)for_each(type_infos_, &nvmp);
}
class CandidatePrinter
{
public:
CandidatePrinter() {}
template <std::size_t I>
int operate(ApproximateSync<MsgTypes...> * sync)
{
const auto & cand = std::get<I>(sync->candidate_);
for (auto & msg : cand) {
std::cout << "cand: " << I << " "
<< rclcpp::Time(msg->header.stamp).nanoseconds() << std::endl;
}
return (0);
}
};
void printCandidate()
{
CandidatePrinter cp;
(void)for_each(type_infos_, &cp);
}
#endif
// some neat template tricks picked up here:
// https://stackoverflow.com/questions/18063451/get-index-of-a-tuple-elements -type
// This template terminates the recursion
template <std::size_t I = 0, typename FuncT, typename... Tp>
inline typename std::enable_if<I == sizeof...(Tp), int>::type for_each(
std::tuple<Tp...> &, FuncT *) // Unused arg needs no name
{
return 0;
} // do nothing
// This template recursively calls itself, thereby iterating
template <std::size_t I = 0, typename FuncT, typename... Tp>
inline typename std::enable_if <
I<sizeof...(Tp), int>::type for_each(std::tuple<Tp...> & t, FuncT * f)
{
const int rv = (*f).template operate<I>(this);
return (rv + for_each<I + 1, FuncT, Tp...>(t, f));
}
// ----------- variables -----------------------
inline static const FullIndex NO_PIVOT;
TupleOfTypeInfo type_infos_; // tuple with data
std::vector<std::vector<std::string>> topics_; // topics to be synced
Callback cb_; // pointer to the callee
CallbackArg candidate_; // holds the potential callback
int num_non_empty_deques_{0};
int tot_num_deques_{0};
size_t queue_size_;
FullIndex pivot_;
rclcpp::Time pivot_time_;
rclcpp::Time candidate_start_;
rclcpp::Time candidate_end_;
rclcpp::Duration max_interval_duration_{rclcpp::Duration::max()};
double age_penalty_{0.1};
size_t num_dropped_{0}; // total number of dropped messages
};
} // namespace flex_sync
#endif // FLEX_SYNC__APPROXIMATE_SYNC_HPP_