Program Listing for File cache.h

Return to documentation for file (include/message_filters/cache.h)

/*********************************************************************
* Software License Agreement (BSD License)
*
*  Copyright (c) 2008, Willow Garage, Inc.
*  All rights reserved.
*
*  Redistribution and use in source and binary forms, with or without
*  modification, are permitted provided that the following conditions
*  are met:
*
*   * Redistributions of source code must retain the above copyright
*     notice, this list of conditions and the following disclaimer.
*   * Redistributions in binary form must reproduce the above
*     copyright notice, this list of conditions and the following
*     disclaimer in the documentation and/or other materials provided
*     with the distribution.
*   * Neither the name of the Willow Garage nor the names of its
*     contributors may be used to endorse or promote products derived
*     from this software without specific prior written permission.
*
*  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
*  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
*  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
*  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
*  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
*  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
*  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
*  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
*  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
*  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
*  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
*  POSSIBILITY OF SUCH DAMAGE.
*********************************************************************/

#ifndef MESSAGE_FILTERS__CACHE_H_
#define MESSAGE_FILTERS__CACHE_H_

#include <deque>
#include <memory>
#include <functional>

#include <rclcpp/rclcpp.hpp>

#include "message_filters/connection.h"
#include "message_filters/simple_filter.h"
#include "message_filters/message_traits.h"

namespace message_filters
{
template<class M>
class Cache : public SimpleFilter<M>
{
public:
  typedef std::shared_ptr<M const> MConstPtr;
  typedef MessageEvent<M const> EventType;

  template<class F>
  Cache(F& f, unsigned int cache_size = 1)
  {
    setCacheSize(cache_size) ;
    connectInput(f) ;
  }

  Cache(unsigned int cache_size = 1)
  {
    setCacheSize(cache_size);
  }

  template<class F>
  void connectInput(F& f)
  {
    incoming_connection_ = f.registerCallback(typename SimpleFilter<M>::EventCallback(
      std::bind(&Cache::callback, this, std::placeholders::_1)));
  }

  ~Cache()
  {
    incoming_connection_.disconnect();
  }

  void setCacheSize(unsigned int cache_size)
  {
    if (cache_size == 0)
    {
      //ROS_ERROR("Cannot set max_size to 0") ;
      return ;
    }

    cache_size_ = cache_size ;
  }

  void add(const MConstPtr& msg)
  {
    add(EventType(msg));
  }

  void add(const EventType& evt)
  {
    namespace mt = message_filters::message_traits;

    //printf("  Cache Size: %u\n", cache_.size()) ;
    {
      std::lock_guard<std::mutex> lock(cache_lock_);

      while (cache_.size() >= cache_size_)                       // Keep popping off old data until we have space for a new msg
        cache_.pop_front() ;                                     // The front of the deque has the oldest elem, so we can get rid of it

      // No longer naively pushing msgs to back. Want to make sure they're sorted correctly
      //cache_.push_back(msg) ;                                    // Add the newest message to the back of the deque

      typename std::deque<EventType >::reverse_iterator rev_it = cache_.rbegin();

      // Keep walking backwards along deque until we hit the beginning,
      //   or until we find a timestamp that's smaller than (or equal to) msg's timestamp
      rclcpp::Time evt_stamp = mt::TimeStamp<M>::value(*evt.getMessage());
      while(rev_it != cache_.rend() && mt::TimeStamp<M>::value(*(*rev_it).getMessage()) > evt_stamp)
        rev_it++;

      // Add msg to the cache
      cache_.insert(rev_it.base(), evt);

    }

    this->signalMessage(evt);
  }

  std::vector<MConstPtr> getInterval(const rclcpp::Time& start, const rclcpp::Time& end) const
  {
    namespace mt = message_filters::message_traits;
    std::lock_guard<std::mutex> lock(cache_lock_);

    // Find the starting index. (Find the first index after [or at] the start of the interval)
    size_t start_index = 0 ;
    while(start_index < cache_.size() &&
          mt::TimeStamp<M>::value(*cache_[start_index].getMessage()) < start)
    {
      start_index++ ;
    }

    // Find the ending index. (Find the first index after the end of interval)
    size_t end_index = start_index ;
    while(end_index < cache_.size() &&
          mt::TimeStamp<M>::value(*cache_[end_index].getMessage()) <= end)
    {
      end_index++ ;
    }

    std::vector<MConstPtr> interval_elems ;
    interval_elems.reserve(end_index - start_index) ;
    for (size_t i=start_index; i<end_index; i++)
    {
      interval_elems.push_back(cache_[i].getMessage()) ;
    }

    return interval_elems ;
  }


  std::vector<MConstPtr> getSurroundingInterval(const rclcpp::Time& start, const rclcpp::Time& end) const
  {
    namespace mt = message_filters::message_traits;

    std::lock_guard<std::mutex> lock(cache_lock_);
    // Find the starting index. (Find the first index after [or at] the start of the interval)
    int start_index = static_cast<int>(cache_.size()) - 1;
    while(start_index > 0 &&
          mt::TimeStamp<M>::value(*cache_[start_index].getMessage()) > start)
    {
      start_index--;
    }

    int end_index = start_index;
    while(end_index < static_cast<int>(cache_.size()) - 1 &&
          mt::TimeStamp<M>::value(*cache_[end_index].getMessage()) < end)
    {
      end_index++;
    }

    std::vector<MConstPtr> interval_elems;
    interval_elems.reserve(end_index - start_index + 1) ;
    for (int i=start_index; i<=end_index; i++)
    {
      interval_elems.push_back(cache_[i].getMessage()) ;
    }

    return interval_elems;
  }

  MConstPtr getElemBeforeTime(const rclcpp::Time& time) const
  {
    namespace mt = message_filters::message_traits;

    std::lock_guard<std::mutex> lock(cache_lock_);

    MConstPtr out ;

    unsigned int i=0 ;
    int elem_index = -1 ;
    while (i<cache_.size() &&
           mt::TimeStamp<M>::value(*cache_[i].getMessage()) < time)
    {
      elem_index = i ;
      i++ ;
    }

    if (elem_index >= 0)
      out = cache_[elem_index].getMessage() ;

    return out ;
  }

  MConstPtr getElemAfterTime(const rclcpp::Time& time) const
  {
    namespace mt = message_filters::message_traits;

    std::lock_guard<std::mutex> lock(cache_lock_);

    MConstPtr out ;

    int i = static_cast<int>(cache_.size()) - 1 ;
    int elem_index = -1 ;
    while (i>=0 &&
        mt::TimeStamp<M>::value(*cache_[i].getMessage()) > time)
    {
      elem_index = i ;
      i-- ;
    }

    if (elem_index >= 0)
      out = cache_[elem_index].getMessage() ;
    else
      out.reset() ;

    return out ;
  }

  rclcpp::Time getLatestTime() const
  {
    namespace mt = message_filters::message_traits;

    std::lock_guard<std::mutex> lock(cache_lock_);

    rclcpp::Time latest_time;

    if (cache_.size() > 0)
      latest_time = mt::TimeStamp<M>::value(*cache_.back().getMessage());

    return latest_time ;
  }

  rclcpp::Time getOldestTime() const
  {
    namespace mt = message_filters::message_traits;

    std::lock_guard<std::mutex> lock(cache_lock_);

    rclcpp::Time oldest_time;

    if (cache_.size() > 0)
      oldest_time = mt::TimeStamp<M>::value(*cache_.front().getMessage());

    return oldest_time ;
  }


private:
  void callback(const EventType& evt)
  {
    add(evt);
  }

  mutable std::mutex cache_lock_ ;
  std::deque<EventType> cache_ ;
  unsigned int cache_size_ ;

  Connection incoming_connection_;
};
}  // namespace message_filters;

#endif  // MESSAGE_FILTERS__CACHE_H_