Program Listing for File exact_sync.hpp

Return to documentation for file (include/flex_sync/exact_sync.hpp)

// -*-c++-*---------------------------------------------------------------------------------------
// Copyright 2024 Bernd Pfrommer <bernd.pfrommer@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef FLEX_SYNC__EXACT_SYNC_HPP_
#define FLEX_SYNC__EXACT_SYNC_HPP_

#include <deque>
#include <flex_sync/msg_pack.hpp>
#include <functional>
#include <iostream>
#include <map>
#include <mutex>
#include <rclcpp/rclcpp.hpp>
#include <string>
#include <tuple>
#include <utility>
#include <vector>

/*
 * Class for exact synchronizing across variable number of messages
 */

namespace flex_sync
{
template <typename... MsgTypes>
class ExactSync
{
private:
  // CallbackArg has arguments of the callback as a tuple
  typedef std::tuple<std::vector<std::shared_ptr<const MsgTypes>>...>
    CallbackArg;
  // A time slot aggregates all messages for that time
  struct TimeSlot
  {
    int num_valid_messages{0};
    CallbackArg candidate;
  };
  // TypeInfo holds all data for a particular message type
  template <typename MsgType>
  struct TypeInfo
  {
    std::map<std::string, int> topic_to_index;
  };
  typedef std::map<rclcpp::Time, std::shared_ptr<TimeSlot>> TimeToSlot;
  typedef std::tuple<TypeInfo<const MsgTypes>...> TupleOfTypeInfo;
public:
  using Callback = std::function<void(
    const std::vector<std::shared_ptr<const MsgTypes>> &...)>;

  ExactSync(
    const std::vector<std::vector<std::string>> & topics, Callback cb,
    size_t queue_size)
  : topics_(topics), cb_(cb), queue_size_(queue_size)
  {
    const size_t ntypes = sizeof...(MsgTypes);
    if (ntypes != topics.size()) {
      RCLCPP_ERROR_STREAM(
        rclcpp::get_logger("exact_sync"),
        "exact sync: number of topic vectors: "
          << topics.size()
          << " does not match number of message types: " << ntypes);
      throw(std::runtime_error("num topic vectors != num msg types"));
    }
    TopicIndexInitializer tii;
    (void)for_each(type_infos_, &tii);
  }

  size_t getNumberDropped() const { return (num_dropped_); }

  void clearNumberDropped() { num_dropped_ = 0; }

  const std::vector<std::vector<std::string>> & getTopics() const
  {
    return (topics_);
  }

  size_t getQueueSize() const { return (queue_size_); }

  template <typename MsgPtrT>
  void process(const std::string & topic, const MsgPtrT msg)
  {
    using TypeInfoT = TypeInfo<typename MsgPtrT::element_type const>;
    using VecT =
      std::vector<std::shared_ptr<typename MsgPtrT::element_type const>>;
    const rclcpp::Time & t = msg->header.stamp;
    auto it = time_to_slot_.find(t);
    if (it == time_to_slot_.end()) {
      std::shared_ptr<TimeSlot> slot = makeTimeSlot();
      it = time_to_slot_.insert({t, slot}).first;
    }
    // find correct topic info via lookup by type
    TypeInfoT & ti = std::get<TypeInfoT>(type_infos_);
    auto topic_it = ti.topic_to_index.find(topic);
    if (topic_it == ti.topic_to_index.end()) {
      RCLCPP_ERROR_STREAM(
        rclcpp::get_logger("exact_sync"),
        "flex_sync: invalid topic " << topic << " for message type");
      throw std::runtime_error("invalid topic: " + topic);
    }
    // from looked-up tuple, grab the right type
    TimeSlot & slot = *(it->second);
    VecT & msg_vec = std::get<VecT>(slot.candidate);
    if (!msg_vec[topic_it->second]) {
      msg_vec[topic_it->second] = msg;  // save message
      slot.num_valid_messages++;
    }
    if (slot.num_valid_messages == tot_num_topics_) {
      // deliver callback
      std::apply([this](auto &&... args) { cb_(args...); }, slot.candidate);
      // clear this and all old tuples
      auto itpp = it;
      itpp++;
      while (time_to_slot_.begin() != itpp) {
        if (time_to_slot_.begin() != it) {
          num_dropped_ += (time_to_slot_.begin()->second)->num_valid_messages;
        }
        time_to_slot_.erase(time_to_slot_.begin());
      }
    }
    if (queue_size_ > 0) {
      while (time_to_slot_.size() > queue_size_) {
        num_dropped_ += (time_to_slot_.begin()->second)->num_valid_messages;
        time_to_slot_.erase(time_to_slot_.begin());
      }
    }
  }
  using message_types = MsgPack<MsgTypes...>;

private:
  struct TopicIndexInitializer
  {
    template <std::size_t I>
    int operate(ExactSync<MsgTypes...> * sync) const
    {
      const size_t num_topics = sync->topics_[I].size();
      auto & type_info = std::get<I>(sync->type_infos_);
      sync->tot_num_topics_ += num_topics;
      // make map between topic string and index for
      // lookup when data arrives
      for (int t_idx = 0; t_idx < static_cast<int>(sync->topics_[I].size());
           t_idx++) {
        type_info.topic_to_index[sync->topics_[I][t_idx]] = t_idx;
      }
      return (num_topics);
    }
  };

  class TimeSlotMaker
  {
  public:
    TimeSlotMaker() { timeSlot_.reset(new TimeSlot()); }
    template <std::size_t I>
    int operate(ExactSync<MsgTypes...> * sync) const
    {
      auto & type_info = std::get<I>(sync->type_infos_);
      auto & cand = std::get<I>(timeSlot_->candidate);
      cand.resize(type_info.topic_to_index.size());
      return (0);
    }
    std::shared_ptr<TimeSlot> getTimeSlot() const { return (timeSlot_); }

  private:
    std::shared_ptr<TimeSlot> timeSlot_;
  };

  // TODO(Bernd): this may be a slow operation, maybe we create it once
  // on startup and make copy afterwards
  std::shared_ptr<TimeSlot> makeTimeSlot()
  {
    TimeSlotMaker tm;
    (void)for_each(type_infos_, &tm);
    return (tm.getTimeSlot());
  }

  // some neat template tricks picked up here:
  // https://stackoverflow.com/questions/18063451/get-index-of-a-tuple-elements -type
  // This template terminates the recursion
  template <std::size_t I = 0, typename FuncT, typename... Tp>
  inline typename std::enable_if<I == sizeof...(Tp), int>::type for_each(
    std::tuple<Tp...> &, FuncT *)  // Unused arg needs no name
  {
    return 0;
  }  // do nothing

  // This template recursively calls itself, thereby iterating
  template <std::size_t I = 0, typename FuncT, typename... Tp>
    inline typename std::enable_if <
    I<sizeof...(Tp), int>::type for_each(std::tuple<Tp...> & t, FuncT * f)
  {
    const int rv = (*f).template operate<I>(this);
    return (rv + for_each<I + 1, FuncT, Tp...>(t, f));
  }
  // ----------- variables -----------------------
  std::vector<std::vector<std::string>> topics_;  // topics to be synced
  Callback cb_;                                   // pointer to the callee
  size_t queue_size_{0};        // keep at most this number of time stamps
  TimeToSlot time_to_slot_;     // maps header time to slot
  TupleOfTypeInfo type_infos_;  // tuple with per-msg-type topic maps
  int tot_num_topics_{0};       // for deciding when time slot is complete
  size_t num_dropped_{0};       // total number of dropped messages
};
}  // namespace flex_sync

#endif  // FLEX_SYNC__EXACT_SYNC_HPP_