Program Listing for File time_sequencer.hpp

Return to documentation for file (include/message_filters/time_sequencer.hpp)

// Copyright 2008, Willow Garage, Inc. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
//    * Redistributions of source code must retain the above copyright
//      notice, this list of conditions and the following disclaimer.
//
//    * Redistributions in binary form must reproduce the above copyright
//      notice, this list of conditions and the following disclaimer in the
//      documentation and/or other materials provided with the distribution.
//
//    * Neither the name of the Willow Garage nor the names of its
//      contributors may be used to endorse or promote products derived from
//      this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.

#ifndef MESSAGE_FILTERS__TIME_SEQUENCER_HPP_
#define MESSAGE_FILTERS__TIME_SEQUENCER_HPP_

#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include <set>
#include <vector>

#include <rclcpp/rclcpp.hpp>

#include "message_filters/connection.hpp"
#include "message_filters/message_traits.hpp"
#include "message_filters/simple_filter.hpp"

namespace message_filters
{

template<class M>
class TimeSequencer : public SimpleFilter<M>
{
public:
  typedef std::shared_ptr<M const> MConstPtr;
  typedef MessageEvent<M const> EventType;

  template<class F>
  TimeSequencer(
    F & f, rclcpp::Duration delay, rclcpp::Duration update_rate, uint32_t queue_size,
    rclcpp::Node::SharedPtr node)
  : delay_(delay)
    , update_rate_(update_rate)
    , queue_size_(queue_size)
    , node_(node)
  {
    init();
    connectInput(f);
  }

  TimeSequencer(
    rclcpp::Duration delay, rclcpp::Duration update_rate, uint32_t queue_size,
    rclcpp::Node::SharedPtr node)
  : delay_(delay)
    , update_rate_(update_rate)
    , queue_size_(queue_size)
    , node_(node)
  {
    init();
  }

  template<class F>
  void connectInput(F & f)
  {
    incoming_connection_.disconnect();
    incoming_connection_ =
      f.registerCallback(
      typename SimpleFilter<M>::EventCallback(
        std::bind(
          &TimeSequencer::cb, this,
          std::placeholders::_1)));
  }

  ~TimeSequencer()
  {
    update_timer_->cancel();
    incoming_connection_.disconnect();
  }

  void add(const EventType & evt)
  {
    namespace mt = message_filters::message_traits;

    std::lock_guard<std::mutex> lock(messages_mutex_);
    if (mt::TimeStamp<M>::value(*evt.getMessage()) < last_time_) {
      return;
    }

    messages_.insert(evt);

    if (queue_size_ != 0 && messages_.size() > queue_size_) {
      messages_.erase(*messages_.begin());
    }
  }

  void add(const MConstPtr & msg)
  {
    EventType evt(msg);
    add(evt);
  }

private:
  class MessageSort
  {
public:
    bool operator()(const EventType & lhs, const EventType & rhs) const
    {
      namespace mt = message_filters::message_traits;
      return mt::TimeStamp<M>::value(*lhs.getMessage()) <
             mt::TimeStamp<M>::value(*rhs.getMessage());
    }
  };
  typedef std::multiset<EventType, MessageSort> S_Message;
  typedef std::vector<EventType> V_Message;

  void cb(const EventType & evt)
  {
    add(evt);
  }

  void dispatch()
  {
    namespace mt = message_filters::message_traits;

    V_Message to_call;

    {
      std::lock_guard<std::mutex> lock(messages_mutex_);

      while (!messages_.empty()) {
        const EventType & e = *messages_.begin();
        rclcpp::Time stamp = mt::TimeStamp<M>::value(*e.getMessage());
        if ((stamp + delay_) <= rclcpp::Clock().now()) {
          last_time_ = stamp;
          to_call.push_back(e);
          messages_.erase(messages_.begin());
        } else {
          break;
        }
      }
    }

    {
      typename V_Message::iterator it = to_call.begin();
      typename V_Message::iterator end = to_call.end();
      for (; it != end; ++it) {
        this->signalMessage(*it);
      }
    }
  }

  void init()
  {
    update_timer_ = node_->create_wall_timer(
      std::chrono::nanoseconds(update_rate_.nanoseconds()), [this]() {
        dispatch();
      });
  }

  rclcpp::Duration delay_;
  rclcpp::Duration update_rate_;
  uint32_t queue_size_;
  rclcpp::Node::SharedPtr node_;
  rclcpp::TimerBase::SharedPtr update_timer_;
  Connection incoming_connection_;


  S_Message messages_;
  std::mutex messages_mutex_;
  rclcpp::Time last_time_;
};

}  // namespace message_filters

#endif  // MESSAGE_FILTERS__TIME_SEQUENCER_HPP_