Program Listing for File latest_time.hpp

Return to documentation for file (include/message_filters/sync_policies/latest_time.hpp)

// Copyright 2022, Open Source Robotics Foundation, Inc. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
//    * Redistributions of source code must retain the above copyright
//      notice, this list of conditions and the following disclaimer.
//
//    * Redistributions in binary form must reproduce the above copyright
//      notice, this list of conditions and the following disclaimer in the
//      documentation and/or other materials provided with the distribution.
//
//    * Neither the name of the Willow Garage nor the names of its
//      contributors may be used to endorse or promote products derived from
//      this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.

#ifndef MESSAGE_FILTERS__SYNC_POLICIES__LATEST_TIME_HPP_
#define MESSAGE_FILTERS__SYNC_POLICIES__LATEST_TIME_HPP_

#include <algorithm>
#include <cmath>
#include <memory>
#include <numeric>
#include <tuple>
#include <vector>

#include <rclcpp/rclcpp.hpp>

#include "message_filters/message_traits.hpp"
#include "message_filters/null_types.hpp"
#include "message_filters/signal9.hpp"
#include "message_filters/synchronizer.hpp"


namespace message_filters
{
namespace sync_policies
{

template<typename M0, typename M1,
  typename M2 = NullType, typename M3 = NullType, typename M4 = NullType,
  typename M5 = NullType, typename M6 = NullType, typename M7 = NullType,
  typename M8 = NullType>
struct LatestTime : public PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8>
{
  typedef Synchronizer<LatestTime> Sync;
  typedef PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8> Super;
  typedef typename Super::Messages Messages;
  typedef typename Super::Signal Signal;
  typedef typename Super::Events Events;
  typedef typename Super::RealTypeCount RealTypeCount;

  typedef std::tuple<double, double, double> RateConfig;

  LatestTime()
  : LatestTime(rclcpp::Clock::SharedPtr(new rclcpp::Clock(RCL_ROS_TIME)))
  {
  }

  explicit LatestTime(rclcpp::Clock::SharedPtr clock)
  : parent_(0),
    ros_clock_{clock}
  {
  }

  LatestTime(const LatestTime & e)
  {
    *this = e;
  }

  LatestTime & operator=(const LatestTime & rhs)
  {
    parent_ = rhs.parent_;
    events_ = rhs.events_;
    rates_ = rhs.rates_;
    ros_clock_ = rhs.ros_clock_;

    return *this;
  }

  void initParent(Sync * parent)
  {
    parent_ = parent;
  }

  void setRateConfigPerMessage(const std::vector<RateConfig> & configs)
  {
    rate_configs_.assign(configs.begin(), configs.end());
  }

  void setRateConfig(const RateConfig & config)
  {
    rate_configs_.assign(1U, config);
  }

  void setClock(rclcpp::Clock::SharedPtr clock)
  {
    ros_clock_ = clock;
  }

  template<int i>
  void add(const typename std::tuple_element<i, Events>::type & evt)
  {
    assert(parent_);

    std::lock_guard<std::mutex> lock(data_mutex_);

    if (!received_msg<i>()) {
      initialize_rate<i>();
      // wait until we get each message once to publish
      // then wait until we got each message twice to compute rates
      // NOTE: this will drop a few messages of the faster topics until
      //       we get one of the slowest so we can sync
      std::get<i>(events_) = evt;  // adding here ensures we see even the slowest
                                   // message twice before computing rate
      return;
    }

    std::get<i>(events_) = evt;
    rclcpp::Time now = ros_clock_->now();
    bool valid_rate = rates_[i].compute_hz(now);
    if (valid_rate && (i == find_pivot(now)) && is_full()) {
      publish();
    }
  }

private:
  // assumed data_mutex_ is locked
  template<int i>
  void initialize_rate()
  {
    if (rate_configs_.size() > 0U) {
      double rate_ema_alpha{Rate::DEFAULT_RATE_EMA_ALPHA};
      double error_ema_alpha{Rate::DEFAULT_ERROR_EMA_ALPHA};
      double rate_step_change_margin_factor{Rate::DEFAULT_MARGIN_FACTOR};
      if (rate_configs_.size() == RealTypeCount::value) {
        std::tie(
          rate_ema_alpha,
          error_ema_alpha,
          rate_step_change_margin_factor) = rate_configs_[i];
      } else if (rate_configs_.size() == 1U) {
        std::tie(
          rate_ema_alpha,
          error_ema_alpha,
          rate_step_change_margin_factor) = rate_configs_[0U];
      }
      rates_.push_back(
        Rate(
          ros_clock_->now(),
          rate_ema_alpha,
          error_ema_alpha,
          rate_step_change_margin_factor));
    } else {
      rates_.push_back(Rate(ros_clock_->now()));
    }
  }

  // assumed data_mutex_ is locked
  void publish()
  {
    parent_->signal(
      std::get<0>(events_), std::get<1>(events_), std::get<2>(events_),
      std::get<3>(events_), std::get<4>(events_), std::get<5>(events_),
      std::get<6>(events_), std::get<7>(events_), std::get<8>(events_));
  }

  struct Rate
  {
    static constexpr double DEFAULT_RATE_EMA_ALPHA{0.9};
    static constexpr double DEFAULT_ERROR_EMA_ALPHA{0.3};
    static constexpr double DEFAULT_MARGIN_FACTOR{10.0};

    rclcpp::Time prev;
    double hz{0.0};
    double error{0.0};
    double rate_ema_alpha{DEFAULT_RATE_EMA_ALPHA};
    double error_ema_alpha{DEFAULT_ERROR_EMA_ALPHA};
    double rate_step_change_margin_factor{DEFAULT_MARGIN_FACTOR};
    bool do_hz_init{true};
    bool do_error_init{true};

    explicit Rate(const rclcpp::Time & start)
    : Rate(start, DEFAULT_RATE_EMA_ALPHA, DEFAULT_ERROR_EMA_ALPHA, DEFAULT_MARGIN_FACTOR)
    {
    }

    Rate(
      const rclcpp::Time & start,
      const double & rate_ema_alpha, const double & error_ema_alpha,
      const double & rate_step_change_margin_factor)
    : prev{start},
      rate_ema_alpha{rate_ema_alpha},
      error_ema_alpha{error_ema_alpha},
      rate_step_change_margin_factor{rate_step_change_margin_factor}
    {
    }

    bool operator>(const Rate & that) const
    {
      return this->hz > that.hz;
    }

    bool compute_hz(const rclcpp::Time & now)
    {
      bool step_change_detected = false;
      do {
        double period = (now - prev).seconds();
        if (period <= 0.0) {
          // multiple messages and time isn't updating
          return false;
        }

        if (do_hz_init) {
          hz = 1.0 / period;
          do_hz_init = false;
          step_change_detected = false;
        } else {
          if (do_error_init) {
            error = fabs(hz - 1.0 / period);
            do_error_init = false;
          } else {
            // check if rate is some multiple of mean error from mean
            if (fabs(hz - 1.0 / period) > rate_step_change_margin_factor * error) {
              // detected step change in rate so reset
              do_hz_init = true;
              do_error_init = true;
              step_change_detected = true;
              continue;
            }
            // on-line mean error from mean
            error = error_ema_alpha * fabs(hz - 1.0 / period) + (1.0 - error_ema_alpha) * error;
          }
          hz = rate_ema_alpha / period + (1.0 - rate_ema_alpha) * hz;
        }
      } while (step_change_detected);
      prev = now;
      return true;
    }
  };

  // assumed data_mutex_ is locked
  template<typename T>
  std::vector<std::size_t> sort_indices(const std::vector<T> & v)
  {
    // initialize original index locations
    std::vector<std::size_t> idx(v.size());
    std::iota(idx.begin(), idx.end(), 0U);

    // sort indexes based on comparing values in v
    // using std::stable_sort instead of std::sort
    // to avoid unnecessary index re-orderings
    // when v contains elements of equal values
    std::stable_sort(
      idx.begin(), idx.end(),
      [&v](std::size_t i1, std::size_t i2) {return v[i1] > v[i2];});

    return idx;
  }

  // assumed data_mutex_ is locked
  template<int i>
  bool received_msg()
  {
    return RealTypeCount::value > i ? static_cast<bool>(std::get<i>(events_).getMessage()) : true;
  }

  // assumed data_mutex_ is locked
  bool is_full()
  {
    bool full = received_msg<0>();
    full = full && received_msg<1>();
    full = full && received_msg<2>();
    full = full && received_msg<3>();
    full = full && received_msg<4>();
    full = full && received_msg<5>();
    full = full && received_msg<6>();
    full = full && received_msg<7>();
    full = full && received_msg<8>();

    return full;
  }

  // assumed data_mutex_ is locked
  int find_pivot(const rclcpp::Time & now)
  {
    // find arg max rate
    std::vector<std::size_t> sorted_idx = sort_indices(rates_);

    // use fastest message that isn't late as pivot
    for (size_t pivot : sorted_idx) {
      double period = (now - rates_[pivot].prev).seconds();
      if (period == 0.0) {
        if (rates_[pivot].hz > 0.0) {
          // we just updated updated this one,
          // and it's fastest, so use as pivot
          return static_cast<int>(pivot);
        } else {
          // haven't calculated rate for this message yet
          continue;
        }
      }

      if (!rates_[pivot].do_error_init) {
        // can now check if new messages are late
        double rate_delta = rates_[pivot].hz - 1.0 / period;
        double margin = rates_[pivot].rate_step_change_margin_factor * rates_[pivot].error;
        if (rate_delta > margin) {
          // this pivot is late
          continue;
        }
      }

      if (rates_[pivot].hz > 0.0) {
        // found fastest message with a calculated rate
        return static_cast<int>(pivot);
      } else {
        // haven't calculated rate for this message yet
        continue;
      }
    }
    return NO_PIVOT;
  }

  Sync * parent_;
  Events events_;
  std::vector<Rate> rates_;
  std::mutex data_mutex_;  // Protects all of the above

  std::vector<RateConfig> rate_configs_;

  const int NO_PIVOT{9};

  rclcpp::Clock::SharedPtr ros_clock_{nullptr};
};

}  // namespace sync_policies
}  // namespace message_filters

#endif  // MESSAGE_FILTERS__SYNC_POLICIES__LATEST_TIME_HPP_