Program Listing for File latest_time.h
↰ Return to documentation for file (include/message_filters/sync_policies/latest_time.h
)
/*********************************************************************
* Software License Agreement (BSD License)
*
* Copyright (c) 2022, Open Source Robotics Foundation, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
* * Neither the name of the Willow Garage nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*********************************************************************/
#ifndef MESSAGE_FILTERS__SYNC_POLICIES__LATEST_TIME_H_
#define MESSAGE_FILTERS__SYNC_POLICIES__LATEST_TIME_H_
#include <algorithm>
#include <memory>
#include <numeric>
#include <tuple>
#include <vector>
#include <rclcpp/rclcpp.hpp>
#include "message_filters/message_traits.h"
#include "message_filters/null_types.h"
#include "message_filters/signal9.h"
#include "message_filters/synchronizer.h"
namespace message_filters
{
namespace sync_policies
{
template<typename M0, typename M1,
typename M2 = NullType, typename M3 = NullType, typename M4 = NullType,
typename M5 = NullType, typename M6 = NullType, typename M7 = NullType,
typename M8 = NullType>
struct LatestTime : public PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8>
{
typedef Synchronizer<LatestTime> Sync;
typedef PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8> Super;
typedef typename Super::Messages Messages;
typedef typename Super::Signal Signal;
typedef typename Super::Events Events;
typedef typename Super::RealTypeCount RealTypeCount;
typedef std::tuple<double, double, double> RateConfig;
LatestTime()
: LatestTime(rclcpp::Clock::SharedPtr(new rclcpp::Clock(RCL_ROS_TIME)))
{
}
LatestTime(rclcpp::Clock::SharedPtr clock)
: parent_(0),
ros_clock_{clock}
{
}
LatestTime(const LatestTime& e)
{
*this = e;
}
LatestTime & operator=(const LatestTime & rhs)
{
parent_ = rhs.parent_;
events_ = rhs.events_;
rates_ = rhs.rates_;
ros_clock_ = rhs.ros_clock_;
return *this;
}
void initParent(Sync * parent)
{
parent_ = parent;
}
void setRateConfigPerMessage(const std::vector<RateConfig> & configs)
{
rate_configs_.assign(configs.begin(), configs.end());
}
void setRateConfig(const RateConfig & config)
{
rate_configs_.assign(1U, config);
}
void setClock(rclcpp::Clock::SharedPtr clock)
{
ros_clock_ = clock;
}
template<int i>
void add(const typename std::tuple_element<i, Events>::type & evt)
{
RCUTILS_ASSERT(parent_);
std::lock_guard<std::mutex> lock(data_mutex_);
if(!received_msg<i>())
{
initialize_rate<i>();
// wait until we get each message once to publish
// then wait until we got each message twice to compute rates
// NOTE: this will drop a few messages of the faster topics until
// we get one of the slowest so we can sync
std::get<i>(events_) = evt; // adding here ensures we see even the slowest
// message twice before computing rate
return;
}
std::get<i>(events_) = evt;
rclcpp::Time now = ros_clock_->now();
bool valid_rate = rates_[i].compute_hz(now);
if(valid_rate && (i == find_pivot(now)) && is_full())
{
publish();
}
}
private:
// assumed data_mutex_ is locked
template<int i>
void initialize_rate()
{
if (rate_configs_.size() > 0U) {
double rate_ema_alpha{Rate::DEFAULT_RATE_EMA_ALPHA};
double error_ema_alpha{Rate::DEFAULT_ERROR_EMA_ALPHA};
double rate_step_change_margin_factor{Rate::DEFAULT_MARGIN_FACTOR};
if (rate_configs_.size() == RealTypeCount::value) {
std::tie (
rate_ema_alpha,
error_ema_alpha,
rate_step_change_margin_factor) = rate_configs_[i];
} else if (rate_configs_.size() == 1U) {
std::tie (
rate_ema_alpha,
error_ema_alpha,
rate_step_change_margin_factor) = rate_configs_[0U];
}
rates_.push_back(
Rate(
ros_clock_->now(),
rate_ema_alpha,
error_ema_alpha,
rate_step_change_margin_factor));
} else {
rates_.push_back(Rate(ros_clock_->now()));
}
}
// assumed data_mutex_ is locked
void publish()
{
parent_->signal(std::get<0>(events_), std::get<1>(events_), std::get<2>(events_),
std::get<3>(events_), std::get<4>(events_), std::get<5>(events_),
std::get<6>(events_), std::get<7>(events_), std::get<8>(events_));
}
struct Rate
{
static constexpr double DEFAULT_RATE_EMA_ALPHA{0.9};
static constexpr double DEFAULT_ERROR_EMA_ALPHA{0.3};
static constexpr double DEFAULT_MARGIN_FACTOR{10.0};
rclcpp::Time prev;
double hz{0.0};
double error{0.0};
double rate_ema_alpha{DEFAULT_RATE_EMA_ALPHA};
double error_ema_alpha{DEFAULT_ERROR_EMA_ALPHA};
double rate_step_change_margin_factor{DEFAULT_MARGIN_FACTOR};
bool do_hz_init{true};
bool do_error_init{true};
Rate(const rclcpp::Time & start)
: Rate(start, DEFAULT_RATE_EMA_ALPHA, DEFAULT_ERROR_EMA_ALPHA, DEFAULT_MARGIN_FACTOR)
{
}
Rate(const rclcpp::Time & start,
const double & rate_ema_alpha, const double & error_ema_alpha,
const double & rate_step_change_margin_factor)
: prev{start},
rate_ema_alpha{rate_ema_alpha},
error_ema_alpha{error_ema_alpha},
rate_step_change_margin_factor{rate_step_change_margin_factor}
{
}
bool operator>(const Rate & that) const
{
return this->hz > that.hz;
}
bool compute_hz(const rclcpp::Time & now)
{
bool step_change_detected = false;
do {
double period = 0.0;
try {
period = (now-prev).seconds();
} catch (const std::runtime_error & /*e*/) {
// Different time sources that might happen on initialization if the messages are not yet available.
// std::cout << "Exception: " << e.what() << std::endl;
return false;
}
if (period <= 0.0) {
// multiple messages and time isn't updating
return false;
}
if (do_hz_init) {
hz = 1.0/period;
do_hz_init = false;
step_change_detected = false;
} else {
if (do_error_init) {
error = fabs(hz - 1.0/period);
do_error_init = false;
} else {
// check if rate is some multiple of mean error from mean
if (fabs(hz - 1.0/period) > rate_step_change_margin_factor*error) {
// detected step change in rate so reset
do_hz_init = true;
do_error_init = true;
step_change_detected = true;
continue;
}
// on-line mean error from mean
error = error_ema_alpha*fabs(hz - 1.0/period) + (1.0 - error_ema_alpha)*error;
}
hz = rate_ema_alpha/period + (1.0 - rate_ema_alpha)*hz;
}
} while (step_change_detected);
prev = now;
return true;
}
};
// assumed data_mutex_ is locked
template <typename T>
std::vector<std::size_t> sort_indices(const std::vector<T> & v)
{
// initialize original index locations
std::vector<std::size_t> idx(v.size());
std::iota(idx.begin(), idx.end(), 0U);
// sort indexes based on comparing values in v
// using std::stable_sort instead of std::sort
// to avoid unnecessary index re-orderings
// when v contains elements of equal values
std::stable_sort(idx.begin(), idx.end(),
[&v](std::size_t i1, std::size_t i2) {return v[i1] > v[i2];});
return idx;
}
// assumed data_mutex_ is locked
template<int i>
bool received_msg()
{
return (RealTypeCount::value > i ? (bool)std::get<i>(events_).getMessage() : true);
}
// assumed data_mutex_ is locked
bool is_full()
{
bool full = received_msg<0>();
full = full && received_msg<1>();
full = full && received_msg<2>();
full = full && received_msg<3>();
full = full && received_msg<4>();
full = full && received_msg<5>();
full = full && received_msg<6>();
full = full && received_msg<7>();
full = full && received_msg<8>();
return full;
}
// assumed data_mutex_ is locked
int find_pivot(const rclcpp::Time & now)
{
// find arg max rate
std::vector<std::size_t> sorted_idx = sort_indices(rates_);
// use fastest message that isn't late as pivot
for (size_t pivot : sorted_idx) {
double period = (now-rates_[pivot].prev).seconds();
if (period == 0.0) {
if (rates_[pivot].hz > 0.0) {
// we just updated updated this one,
// and it's fastest, so use as pivot
return static_cast<int>(pivot);
} else {
// haven't calculated rate for this message yet
continue;
}
}
if (!rates_[pivot].do_error_init) {
// can now check if new messages are late
double rate_delta = rates_[pivot].hz - 1.0/period;
double margin = rates_[pivot].rate_step_change_margin_factor * rates_[pivot].error;
if (rate_delta > margin) {
// this pivot is late
continue;
}
}
if (rates_[pivot].hz > 0.0) {
// found fastest message with a calculated rate
return static_cast<int>(pivot);
} else {
// haven't calculated rate for this message yet
continue;
}
}
return NO_PIVOT;
}
Sync* parent_;
Events events_;
std::vector<Rate> rates_;
std::mutex data_mutex_; // Protects all of the above
std::vector<RateConfig> rate_configs_;
const int NO_PIVOT{9};
rclcpp::Clock::SharedPtr ros_clock_{nullptr};
};
} // namespace sync
} // namespace message_filters
#endif // MESSAGE_FILTERS__SYNC_POLICIES__LATEST_TIME_H_