Program Listing for File approximate_time.h

Return to documentation for file (include/message_filters/sync_policies/approximate_time.h)

/*********************************************************************
* Software License Agreement (BSD License)
*
*  Copyright (c) 2009, Willow Garage, Inc.
*  All rights reserved.
*
*  Redistribution and use in source and binary forms, with or without
*  modification, are permitted provided that the following conditions
*  are met:
*
*   * Redistributions of source code must retain the above copyright
*     notice, this list of conditions and the following disclaimer.
*   * Redistributions in binary form must reproduce the above
*     copyright notice, this list of conditions and the following
*     disclaimer in the documentation and/or other materials provided
*     with the distribution.
*   * Neither the name of the Willow Garage nor the names of its
*     contributors may be used to endorse or promote products derived
*     from this software without specific prior written permission.
*
*  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
*  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
*  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
*  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
*  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
*  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
*  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
*  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
*  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
*  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
*  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
*  POSSIBILITY OF SUCH DAMAGE.
*********************************************************************/

#ifndef MESSAGE_FILTERS__SYNC_POLICIES__APPROXIMATE_TIME_H_
#define MESSAGE_FILTERS__SYNC_POLICIES__APPROXIMATE_TIME_H_

#include <cassert>
#include <deque>
#include <string>
#include <tuple>
#include <vector>

#include <inttypes.h>

#include <rclcpp/rclcpp.hpp>
#include <rcutils/logging_macros.h>

#include "message_filters/connection.h"
#include "message_filters/message_traits.h"
#include "message_filters/null_types.h"
#include "message_filters/signal9.h"
#include "message_filters/synchronizer.h"


#ifndef RCUTILS_ASSERT
// TODO(tfoote) remove this after it's implemented upstream
// https://github.com/ros2/rcutils/pull/112
#define RCUTILS_ASSERT assert
#endif
#ifndef RCUTILS_BREAK
#include <cassert>
// TODO(tfoote) remove this after it's implemented upstream
// https://github.com/ros2/rcutils/pull/112
#define RCUTILS_BREAK std::abort
#endif
// Uncomment below intead
//#include <rcutils/assert.h>

namespace message_filters
{
namespace sync_policies
{

template<typename M0, typename M1, typename M2 = NullType, typename M3 = NullType, typename M4 = NullType,
         typename M5 = NullType, typename M6 = NullType, typename M7 = NullType, typename M8 = NullType>
struct ApproximateTime : public PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8>
{
  typedef Synchronizer<ApproximateTime> Sync;
  typedef PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8> Super;
  typedef typename Super::Messages Messages;
  typedef typename Super::Signal Signal;
  typedef typename Super::Events Events;
  typedef typename Super::RealTypeCount RealTypeCount;
  typedef typename Super::M0Event M0Event;
  typedef typename Super::M1Event M1Event;
  typedef typename Super::M2Event M2Event;
  typedef typename Super::M3Event M3Event;
  typedef typename Super::M4Event M4Event;
  typedef typename Super::M5Event M5Event;
  typedef typename Super::M6Event M6Event;
  typedef typename Super::M7Event M7Event;
  typedef typename Super::M8Event M8Event;
  typedef std::deque<M0Event> M0Deque;
  typedef std::deque<M1Event> M1Deque;
  typedef std::deque<M2Event> M2Deque;
  typedef std::deque<M3Event> M3Deque;
  typedef std::deque<M4Event> M4Deque;
  typedef std::deque<M5Event> M5Deque;
  typedef std::deque<M6Event> M6Deque;
  typedef std::deque<M7Event> M7Deque;
  typedef std::deque<M8Event> M8Deque;
  typedef std::vector<M0Event> M0Vector;
  typedef std::vector<M1Event> M1Vector;
  typedef std::vector<M2Event> M2Vector;
  typedef std::vector<M3Event> M3Vector;
  typedef std::vector<M4Event> M4Vector;
  typedef std::vector<M5Event> M5Vector;
  typedef std::vector<M6Event> M6Vector;
  typedef std::vector<M7Event> M7Vector;
  typedef std::vector<M8Event> M8Vector;
  typedef Events Tuple;
  typedef std::tuple<M0Deque, M1Deque, M2Deque, M3Deque, M4Deque, M5Deque, M6Deque, M7Deque, M8Deque> DequeTuple;
  typedef std::tuple<M0Vector, M1Vector, M2Vector, M3Vector, M4Vector, M5Vector, M6Vector, M7Vector, M8Vector> VectorTuple;

  ApproximateTime(uint32_t queue_size)
  : parent_(0)
  , queue_size_(queue_size)
  , num_non_empty_deques_(0)
  , pivot_(NO_PIVOT)
  , max_interval_duration_(rclcpp::Duration(std::numeric_limits<int32_t>::max(),999999999))
  , age_penalty_(0.1)
  , has_dropped_messages_(9, false)
  , inter_message_lower_bounds_(9, rclcpp::Duration(0, 0))
  , warned_about_incorrect_bound_(9, false)
  {
    RCUTILS_ASSERT(queue_size_ > 0);  // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
  }

  ApproximateTime(const ApproximateTime& e)
  : max_interval_duration_(rclcpp::Duration(std::numeric_limits<int32_t>::max(),999999999))
  {
    *this = e;
  }

  ApproximateTime& operator=(const ApproximateTime& rhs)
  {
    parent_ = rhs.parent_;
    queue_size_ = rhs.queue_size_;
    num_non_empty_deques_ = rhs.num_non_empty_deques_;
    pivot_time_ = rhs.pivot_time_;
    pivot_ = rhs.pivot_;
    max_interval_duration_ = rhs.max_interval_duration_;
    age_penalty_ = rhs.age_penalty_;
    candidate_start_ = rhs.candidate_start_;
    candidate_end_ = rhs.candidate_end_;
    deques_ = rhs.deques_;
    past_ = rhs.past_;
    has_dropped_messages_ = rhs.has_dropped_messages_;
    inter_message_lower_bounds_ = rhs.inter_message_lower_bounds_;
    warned_about_incorrect_bound_ = rhs.warned_about_incorrect_bound_;

    return *this;
  }

  void initParent(Sync* parent)
  {
    parent_ = parent;
  }

  template<int i>
  void checkInterMessageBound()
  {
    namespace mt = message_filters::message_traits;
    if (warned_about_incorrect_bound_[i])
    {
      return;
    }
    std::deque<typename std::tuple_element<i, Events>::type>& deque = std::get<i>(deques_);
    std::vector<typename std::tuple_element<i, Events>::type>& v = std::get<i>(past_);
    RCUTILS_ASSERT(!deque.empty());
    const typename std::tuple_element<i, Messages>::type &msg = *(deque.back()).getMessage();
    rclcpp::Time msg_time = mt::TimeStamp<typename std::tuple_element<i, Messages>::type>::value(msg);
    rclcpp::Time previous_msg_time;
    if (deque.size() == (size_t) 1)
    {
      if (v.empty())
      {
    // We have already published (or have never received) the previous message, we cannot check the bound
    return;
      }
      const typename std::tuple_element<i, Messages>::type &previous_msg = *(v.back()).getMessage();
      previous_msg_time = mt::TimeStamp<typename std::tuple_element<i, Messages>::type>::value(previous_msg);
    }
    else
    {
      // There are at least 2 elements in the deque. Check that the gap respects the bound if it was provided.
      const typename std::tuple_element<i, Messages>::type &previous_msg = *(deque[deque.size()-2]).getMessage();
      previous_msg_time = mt::TimeStamp<typename std::tuple_element<i, Messages>::type>::value(previous_msg);
    }
    if (msg_time < previous_msg_time)
    {
      RCUTILS_LOG_WARN_ONCE("Messages of type %d arrived out of order (will print only once)", i);
      warned_about_incorrect_bound_[i] = true;
    }
    else if ((msg_time - previous_msg_time) < inter_message_lower_bounds_[i])
    {
      RCUTILS_LOG_WARN_ONCE("Messages of type %d arrived closer ("
        "%" PRId64 ") than the lower bound you provided ("
        "%" PRId64 ") (will print only once)",
        i,
        (msg_time - previous_msg_time).nanoseconds(),
        inter_message_lower_bounds_[i].nanoseconds());
      warned_about_incorrect_bound_[i] = true;
    }
  }


  template<int i>
  void add(const typename std::tuple_element<i, Events>::type& evt)
  {
    std::lock_guard<std::mutex> lock(data_mutex_);

    std::deque<typename std::tuple_element<i, Events>::type>& deque = std::get<i>(deques_);
    deque.push_back(evt);
    if (deque.size() == (size_t)1) {
      // We have just added the first message, so it was empty before
      ++num_non_empty_deques_;
      if (num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
      {
        // All deques have messages
        process();
      }
    }
    else
    {
      checkInterMessageBound<i>();
    }
    // Check whether we have more messages than allowed in the queue.
    // Note that during the above call to process(), queue i may contain queue_size_+1 messages.
    std::vector<typename std::tuple_element<i, Events>::type>& past = std::get<i>(past_);
    if (deque.size() + past.size() > queue_size_)
    {
      // Cancel ongoing candidate search, if any:
      num_non_empty_deques_ = 0; // We will recompute it from scratch
      recover<0>();
      recover<1>();
      recover<2>();
      recover<3>();
      recover<4>();
      recover<5>();
      recover<6>();
      recover<7>();
      recover<8>();
      // Drop the oldest message in the offending topic
      RCUTILS_ASSERT(!deque.empty());
      deque.pop_front();
      has_dropped_messages_[i] = true;
      if (pivot_ != NO_PIVOT)
      {
    // The candidate is no longer valid. Destroy it.
    candidate_ = Tuple();
    pivot_ = NO_PIVOT;
    // There might still be enough messages to create a new candidate:
    process();
      }
    }
  }

  void setAgePenalty(double age_penalty)
  {
    // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
    RCUTILS_ASSERT(age_penalty >= 0);
    age_penalty_ = age_penalty;
  }

  void setInterMessageLowerBound(int i, rclcpp::Duration lower_bound) {
    // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
    RCUTILS_ASSERT(lower_bound >= rclcpp::Duration(0,0));
    inter_message_lower_bounds_[i] = lower_bound;
  }

  void setMaxIntervalDuration(rclcpp::Duration max_interval_duration) {
    // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
    RCUTILS_ASSERT(max_interval_duration >= rclcpp::Duration(0,0));
    max_interval_duration_ = max_interval_duration;
  }

private:
  // Assumes that deque number <index> is non empty
  template<int i>
  void dequeDeleteFront()
  {
    std::deque<typename std::tuple_element<i, Events>::type>& deque = std::get<i>(deques_);
    RCUTILS_ASSERT(!deque.empty());
    deque.pop_front();
    if (deque.empty())
    {
      --num_non_empty_deques_;
    }
  }

  // Assumes that deque number <index> is non empty
  void dequeDeleteFront(uint32_t index)
  {
    switch (index)
    {
    case 0:
      dequeDeleteFront<0>();
      break;
    case 1:
      dequeDeleteFront<1>();
      break;
    case 2:
      dequeDeleteFront<2>();
      break;
    case 3:
      dequeDeleteFront<3>();
      break;
    case 4:
      dequeDeleteFront<4>();
      break;
    case 5:
      dequeDeleteFront<5>();
      break;
    case 6:
      dequeDeleteFront<6>();
      break;
    case 7:
      dequeDeleteFront<7>();
      break;
    case 8:
      dequeDeleteFront<8>();
      break;
    default:
      RCUTILS_BREAK();
    }
  }

  // Assumes that deque number <index> is non empty
  template<int i>
  void dequeMoveFrontToPast()
  {
    std::deque<typename std::tuple_element<i, Events>::type>& deque = std::get<i>(deques_);
    std::vector<typename std::tuple_element<i, Events>::type>& vector = std::get<i>(past_);
    RCUTILS_ASSERT(!deque.empty());
    vector.push_back(deque.front());
    deque.pop_front();
    if (deque.empty())
    {
      --num_non_empty_deques_;
    }
  }
  // Assumes that deque number <index> is non empty
  void dequeMoveFrontToPast(uint32_t index)
  {
    switch (index)
    {
    case 0:
      dequeMoveFrontToPast<0>();
      break;
    case 1:
      dequeMoveFrontToPast<1>();
      break;
    case 2:
      dequeMoveFrontToPast<2>();
      break;
    case 3:
      dequeMoveFrontToPast<3>();
      break;
    case 4:
      dequeMoveFrontToPast<4>();
      break;
    case 5:
      dequeMoveFrontToPast<5>();
      break;
    case 6:
      dequeMoveFrontToPast<6>();
      break;
    case 7:
      dequeMoveFrontToPast<7>();
      break;
    case 8:
      dequeMoveFrontToPast<8>();
      break;
    default:
      RCUTILS_BREAK();
    }
  }

  void makeCandidate()
  {
    //printf("Creating candidate\n");
    // Create candidate tuple
    candidate_ = Tuple(); // Discards old one if any
    std::get<0>(candidate_) = std::get<0>(deques_).front();
    std::get<1>(candidate_) = std::get<1>(deques_).front();
    if (RealTypeCount::value > 2)
    {
      std::get<2>(candidate_) = std::get<2>(deques_).front();
      if (RealTypeCount::value > 3)
      {
    std::get<3>(candidate_) = std::get<3>(deques_).front();
    if (RealTypeCount::value > 4)
    {
      std::get<4>(candidate_) = std::get<4>(deques_).front();
      if (RealTypeCount::value > 5)
      {
        std::get<5>(candidate_) = std::get<5>(deques_).front();
        if (RealTypeCount::value > 6)
        {
          std::get<6>(candidate_) = std::get<6>(deques_).front();
          if (RealTypeCount::value > 7)
          {
        std::get<7>(candidate_) = std::get<7>(deques_).front();
        if (RealTypeCount::value > 8)
        {
          std::get<8>(candidate_) = std::get<8>(deques_).front();
        }
          }
        }
      }
    }
      }
    }
    // Delete all past messages, since we have found a better candidate
    std::get<0>(past_).clear();
    std::get<1>(past_).clear();
    std::get<2>(past_).clear();
    std::get<3>(past_).clear();
    std::get<4>(past_).clear();
    std::get<5>(past_).clear();
    std::get<6>(past_).clear();
    std::get<7>(past_).clear();
    std::get<8>(past_).clear();
    //printf("Candidate created\n");
  }


  // ASSUMES: num_messages <= past_[i].size()
  template<int i>
  void recover(size_t num_messages)
  {
    if (i >= RealTypeCount::value)
    {
      return;
    }

    std::vector<typename std::tuple_element<i, Events>::type>& v = std::get<i>(past_);
    std::deque<typename std::tuple_element<i, Events>::type>& q = std::get<i>(deques_);
    RCUTILS_ASSERT(num_messages <= v.size());
    while (num_messages > 0)
    {
      q.push_front(v.back());
      v.pop_back();
      num_messages--;
    }

    if (!q.empty())
    {
      ++num_non_empty_deques_;
    }
  }


  template<int i>
  void recover()
  {
    if (i >= RealTypeCount::value)
    {
      return;
    }

    std::vector<typename std::tuple_element<i, Events>::type>& v = std::get<i>(past_);
    std::deque<typename std::tuple_element<i, Events>::type>& q = std::get<i>(deques_);
    while (!v.empty())
    {
      q.push_front(v.back());
      v.pop_back();
    }

    if (!q.empty())
    {
      ++num_non_empty_deques_;
    }
  }


  template<int i>
  void recoverAndDelete()
  {
    if (i >= RealTypeCount::value)
    {
      return;
    }

    std::vector<typename std::tuple_element<i, Events>::type>& v = std::get<i>(past_);
    std::deque<typename std::tuple_element<i, Events>::type>& q = std::get<i>(deques_);
    while (!v.empty())
    {
      q.push_front(v.back());
      v.pop_back();
    }

    RCUTILS_ASSERT(!q.empty());

    q.pop_front();
    if (!q.empty())
    {
      ++num_non_empty_deques_;
    }
  }

  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
  void publishCandidate()
  {
    //printf("Publishing candidate\n");
    // Publish
    parent_->signal(std::get<0>(candidate_), std::get<1>(candidate_), std::get<2>(candidate_), std::get<3>(candidate_),
                    std::get<4>(candidate_), std::get<5>(candidate_), std::get<6>(candidate_), std::get<7>(candidate_),
                    std::get<8>(candidate_));
    // Delete this candidate
    candidate_ = Tuple();
    pivot_ = NO_PIVOT;

    // Recover hidden messages, and delete the ones corresponding to the candidate
    num_non_empty_deques_ = 0; // We will recompute it from scratch
    recoverAndDelete<0>();
    recoverAndDelete<1>();
    recoverAndDelete<2>();
    recoverAndDelete<3>();
    recoverAndDelete<4>();
    recoverAndDelete<5>();
    recoverAndDelete<6>();
    recoverAndDelete<7>();
    recoverAndDelete<8>();
  }

  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
  // Returns: the oldest message on the deques
  void getCandidateStart(uint32_t &start_index, rclcpp::Time &start_time)
  {
    return getCandidateBoundary(start_index, start_time, false);
  }

  // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
  // Returns: the latest message among the heads of the deques, i.e. the minimum
  //          time to end an interval started at getCandidateStart_index()
  void getCandidateEnd(uint32_t &end_index, rclcpp::Time &end_time)
  {
    return getCandidateBoundary(end_index, end_time, true);
  }

  // ASSUMES: all deques are non-empty
  // end = true: look for the latest head of deque
  //       false: look for the earliest head of deque
  void getCandidateBoundary(uint32_t &index, rclcpp::Time &time, bool end)
  {
    namespace mt = message_filters::message_traits;

    M0Event& m0 = std::get<0>(deques_).front();
    time = mt::TimeStamp<M0>::value(*m0.getMessage());
    index = 0;
    if (RealTypeCount::value > 1)
    {
      M1Event& m1 = std::get<1>(deques_).front();
      if ((mt::TimeStamp<M1>::value(*m1.getMessage()) < time) ^ end)
      {
        time = mt::TimeStamp<M1>::value(*m1.getMessage());
        index = 1;
      }
    }
    if (RealTypeCount::value > 2)
    {
      M2Event& m2 = std::get<2>(deques_).front();
      if ((mt::TimeStamp<M2>::value(*m2.getMessage()) < time) ^ end)
      {
        time = mt::TimeStamp<M2>::value(*m2.getMessage());
        index = 2;
      }
    }
    if (RealTypeCount::value > 3)
    {
      M3Event& m3 = std::get<3>(deques_).front();
      if ((mt::TimeStamp<M3>::value(*m3.getMessage()) < time) ^ end)
      {
        time = mt::TimeStamp<M3>::value(*m3.getMessage());
        index = 3;
      }
    }
    if (RealTypeCount::value > 4)
    {
      M4Event& m4 = std::get<4>(deques_).front();
      if ((mt::TimeStamp<M4>::value(*m4.getMessage()) < time) ^ end)
      {
        time = mt::TimeStamp<M4>::value(*m4.getMessage());
        index = 4;
      }
    }
    if (RealTypeCount::value > 5)
    {
      M5Event& m5 = std::get<5>(deques_).front();
      if ((mt::TimeStamp<M5>::value(*m5.getMessage()) < time) ^ end)
      {
        time = mt::TimeStamp<M5>::value(*m5.getMessage());
        index = 5;
      }
    }
    if (RealTypeCount::value > 6)
    {
      M6Event& m6 = std::get<6>(deques_).front();
      if ((mt::TimeStamp<M6>::value(*m6.getMessage()) < time) ^ end)
      {
        time = mt::TimeStamp<M6>::value(*m6.getMessage());
        index = 6;
      }
    }
    if (RealTypeCount::value > 7)
    {
      M7Event& m7 = std::get<7>(deques_).front();
      if ((mt::TimeStamp<M7>::value(*m7.getMessage()) < time) ^ end)
      {
        time = mt::TimeStamp<M7>::value(*m7.getMessage());
        index = 7;
      }
    }
    if (RealTypeCount::value > 8)
    {
      M8Event& m8 = std::get<8>(deques_).front();
      if ((mt::TimeStamp<M8>::value(*m8.getMessage()) < time) ^ end)
      {
        time = mt::TimeStamp<M8>::value(*m8.getMessage());
        index = 8;
      }
    }
  }


  // ASSUMES: we have a pivot and candidate
  template<int i>
  rclcpp::Time getVirtualTime()
  {
    namespace mt = message_filters::message_traits;

    if (i >= RealTypeCount::value)
    {
      return rclcpp::Time(0,0);  // Dummy return value
    }
    RCUTILS_ASSERT(pivot_ != NO_PIVOT);

    std::vector<typename std::tuple_element<i, Events>::type>& v = std::get<i>(past_);
    std::deque<typename std::tuple_element<i, Events>::type>& q = std::get<i>(deques_);
    if (q.empty())
    {
      RCUTILS_ASSERT(!v.empty());  // Because we have a candidate
      rclcpp::Time last_msg_time = mt::TimeStamp<typename std::tuple_element<i, Messages>::type>::value(*(v.back()).getMessage());
      rclcpp::Time msg_time_lower_bound = last_msg_time + inter_message_lower_bounds_[i];
      if (msg_time_lower_bound > pivot_time_)  // Take the max
      {
        return msg_time_lower_bound;
      }
      return pivot_time_;
    }
    rclcpp::Time current_msg_time = mt::TimeStamp<typename std::tuple_element<i, Messages>::type>::value(*(q.front()).getMessage());
    return current_msg_time;
  }


  // ASSUMES: we have a pivot and candidate
  void getVirtualCandidateStart(uint32_t &start_index, rclcpp::Time &start_time)
  {
    return getVirtualCandidateBoundary(start_index, start_time, false);
  }

  // ASSUMES: we have a pivot and candidate
  void getVirtualCandidateEnd(uint32_t &end_index, rclcpp::Time &end_time)
  {
    return getVirtualCandidateBoundary(end_index, end_time, true);
  }

  // ASSUMES: we have a pivot and candidate
  // end = true: look for the latest head of deque
  //       false: look for the earliest head of deque
  void getVirtualCandidateBoundary(uint32_t &index, rclcpp::Time &time, bool end)
  {
    std::vector<rclcpp::Time> virtual_times(9);
    virtual_times[0] = getVirtualTime<0>();
    virtual_times[1] = getVirtualTime<1>();
    virtual_times[2] = getVirtualTime<2>();
    virtual_times[3] = getVirtualTime<3>();
    virtual_times[4] = getVirtualTime<4>();
    virtual_times[5] = getVirtualTime<5>();
    virtual_times[6] = getVirtualTime<6>();
    virtual_times[7] = getVirtualTime<7>();
    virtual_times[8] = getVirtualTime<8>();

    time = virtual_times[0];
    index = 0;
    for (int i = 0; i < RealTypeCount::value; i++)
    {
      if ((virtual_times[i] < time) ^ end)
      {
    time = virtual_times[i];
    index = i;
      }
    }
  }


  // assumes data_mutex_ is already locked
  void process()
  {
    // While no deque is empty
    while (num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
    {
      // Find the start and end of the current interval
      //printf("Entering while loop in this state [\n");
      //show_internal_state();
      //printf("]\n");
      rclcpp::Time end_time, start_time;
      uint32_t end_index, start_index;
      getCandidateEnd(end_index, end_time);
      getCandidateStart(start_index, start_time);
      for (uint32_t i = 0; i < (uint32_t)RealTypeCount::value; i++)
      {
    if (i != end_index)
    {
      // No dropped message could have been better to use than the ones we have,
      // so it becomes ok to use this topic as pivot in the future
      has_dropped_messages_[i] = false;
    }
      }
      if (pivot_ == NO_PIVOT)
      {
        // We do not have a candidate
        // INVARIANT: the past_ vectors are empty
        // INVARIANT: (candidate_ has no filled members)
        if (end_time - start_time > max_interval_duration_)
        {
          // This interval is too big to be a valid candidate, move to the next
          dequeDeleteFront(start_index);
          continue;
        }
    if (has_dropped_messages_[end_index])
    {
      // The topic that would become pivot has dropped messages, so it is not a good pivot
      dequeDeleteFront(start_index);
      continue;
    }
    // This is a valid candidate, and we don't have any, so take it
    makeCandidate();
    candidate_start_ = start_time;
    candidate_end_ = end_time;
    pivot_ = end_index;
    pivot_time_ = end_time;
    dequeMoveFrontToPast(start_index);
      }
      else
      {
        // We already have a candidate
        // Is this one better than the current candidate?
        // INVARIANT: has_dropped_messages_ is all false
        if ((end_time - candidate_end_) * (1 + age_penalty_) >= (start_time - candidate_start_))
        {
          // This is not a better candidate, move to the next
          dequeMoveFrontToPast(start_index);
        }
        else
        {
          // This is a better candidate
          makeCandidate();
          candidate_start_ = start_time;
          candidate_end_ = end_time;
          dequeMoveFrontToPast(start_index);
          // Keep the same pivot (and pivot time)
        }
      }
      // INVARIANT: we have a candidate and pivot
      RCUTILS_ASSERT(pivot_ != NO_PIVOT);
      //printf("start_index == %d, pivot_ == %d\n", start_index, pivot_);
      if (start_index == pivot_)  // TODO: replace with start_time == pivot_time_
      {
        // We have exhausted all possible candidates for this pivot, we now can output the best one
        publishCandidate();
      }
      else if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_))
      {
        // We have not exhausted all candidates, but this candidate is already provably optimal
        // Indeed, any future candidate must contain the interval [pivot_time_ end_time], which
        // is already too big.
        // Note: this case is subsumed by the next, but it may save some unnecessary work and
        //       it makes things (a little) easier to understand
        publishCandidate();
      }
      else if (num_non_empty_deques_ < (uint32_t)RealTypeCount::value)
      {
        uint32_t num_non_empty_deques_before_virtual_search = num_non_empty_deques_;

        // Before giving up, use the rate bounds, if provided, to further try to prove optimality
        std::vector<int> num_virtual_moves(9,0);
        while (1)
        {
          rclcpp::Time end_time, start_time;
          uint32_t end_index, start_index;
          getVirtualCandidateEnd(end_index, end_time);
          getVirtualCandidateStart(start_index, start_time);
          if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_))
          {
            // We have proved optimality
            // As above, any future candidate must contain the interval [pivot_time_ end_time], which
            // is already too big.
            publishCandidate();  // This cleans up the virtual moves as a byproduct
            break;  // From the while(1) loop only
          }
          if ((end_time - candidate_end_) * (1 + age_penalty_) < (start_time - candidate_start_))
          {
            // We cannot prove optimality
            // Indeed, we have a virtual (i.e. optimistic) candidate that is better than the current
            // candidate
            // Cleanup the virtual search:
            num_non_empty_deques_ = 0; // We will recompute it from scratch
        recover<0>(num_virtual_moves[0]);
        recover<1>(num_virtual_moves[1]);
        recover<2>(num_virtual_moves[2]);
        recover<3>(num_virtual_moves[3]);
        recover<4>(num_virtual_moves[4]);
        recover<5>(num_virtual_moves[5]);
        recover<6>(num_virtual_moves[6]);
        recover<7>(num_virtual_moves[7]);
        recover<8>(num_virtual_moves[8]);
            (void)num_non_empty_deques_before_virtual_search; // unused variable warning stopper
            RCUTILS_ASSERT(num_non_empty_deques_before_virtual_search == num_non_empty_deques_);
            break;
          }
          // Note: we cannot reach this point with start_index == pivot_ since in that case we would
          //       have start_time == pivot_time, in which case the two tests above are the negation
          //       of each other, so that one must be true. Therefore the while loop always terminates.
      RCUTILS_ASSERT(start_index != pivot_);
      RCUTILS_ASSERT(start_time < pivot_time_);
          dequeMoveFrontToPast(start_index);
          num_virtual_moves[start_index]++;
        } // while(1)
      }
    } // while(num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
  }

  Sync* parent_;
  uint32_t queue_size_;

  static const uint32_t NO_PIVOT = 9;  // Special value for the pivot indicating that no pivot has been selected

  DequeTuple deques_;
  uint32_t num_non_empty_deques_;
  VectorTuple past_;
  Tuple candidate_;  // NULL if there is no candidate, in which case there is no pivot.
  rclcpp::Time candidate_start_;
  rclcpp::Time candidate_end_;
  rclcpp::Time pivot_time_;
  uint32_t pivot_;  // Equal to NO_PIVOT if there is no candidate
  std::mutex data_mutex_;  // Protects all of the above

  rclcpp::Duration max_interval_duration_; // TODO: initialize with a parameter
  double age_penalty_;

  std::vector<bool> has_dropped_messages_;
  std::vector<rclcpp::Duration> inter_message_lower_bounds_;
  std::vector<bool> warned_about_incorrect_bound_;
};

}  // namespace sync
}  // namespace message_filters

#endif // MESSAGE_FILTERS__SYNC_POLICIES__APPROXIMATE_TIME_H_