Program Listing for File message_buffer_impl.hpp
↰ Return to documentation for file (include/fuse_core/message_buffer_impl.hpp
)
/*
* Software License Agreement (BSD License)
*
* Copyright (c) 2018, Locus Robotics
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef FUSE_CORE__MESSAGE_BUFFER_IMPL_HPP_
#define FUSE_CORE__MESSAGE_BUFFER_IMPL_HPP_
#include <algorithm>
#include <sstream>
#include <stdexcept>
#include <utility>
#include <boost/iterator/transform_iterator.hpp>
#include <rclcpp/duration.hpp>
#include <rclcpp/time.hpp>
namespace fuse_core
{
template<class Message>
MessageBuffer<Message>::MessageBuffer(const rclcpp::Duration & buffer_length)
: buffer_length_(buffer_length)
{
}
template<class Message>
void MessageBuffer<Message>::insert(const rclcpp::Time & stamp, const Message & msg)
{
buffer_.emplace_back(stamp, msg);
purgeHistory();
}
template<class Message>
typename MessageBuffer<Message>::message_range MessageBuffer<Message>::query(
const rclcpp::Time & beginning_stamp,
const rclcpp::Time & ending_stamp,
bool extended_range)
{
// Verify the query is valid
if (ending_stamp < beginning_stamp) {
std::stringstream beginning_time_ss;
beginning_time_ss << beginning_stamp.seconds();
std::stringstream ending_time_ss;
ending_time_ss << ending_stamp.seconds();
throw std::invalid_argument(
"The beginning_stamp (" + beginning_time_ss.str() + ") must be less than or equal to "
"the ending_stamp (" + ending_time_ss.str() + ").");
}
// Verify the query is within the bounds of the buffer
if (buffer_.empty() || (beginning_stamp < buffer_.front().first) ||
(ending_stamp > buffer_.back().first))
{
std::stringstream requested_time_range_ss;
requested_time_range_ss << "(" << beginning_stamp.seconds() << ", " << ending_stamp.seconds() <<
")";
std::stringstream available_time_range_ss;
if (buffer_.empty()) {
available_time_range_ss << "(EMPTY)";
} else {
available_time_range_ss << "(" << buffer_.front().first.seconds() << ", "
<< buffer_.back().first.seconds() << ")";
}
throw std::out_of_range(
"The requested time range " + requested_time_range_ss.str() +
" is outside the available time range " + available_time_range_ss.str() + ".");
}
// Find the entry that is strictly greater than the requested beginning stamp. If the extended
// range flag is true, we will then back up one entry.
auto upper_bound_comparison = [](const auto & stamp, const auto & element) -> bool
{
return element.first > stamp;
};
auto beginning_iter = std::upper_bound(
buffer_.begin(),
buffer_.end(), beginning_stamp, upper_bound_comparison);
if (extended_range) {
--beginning_iter;
}
// Find the entry that is greater than or equal to the ending stamp. If the extended range flag is
// false, we will back up one entry.
auto lower_bound_comparison = [](const auto & element, const auto & stamp) -> bool
{
return element.first < stamp;
};
auto ending_iter = std::lower_bound(
buffer_.begin(),
buffer_.end(), ending_stamp, lower_bound_comparison);
if (extended_range && (ending_iter != buffer_.end())) {
++ending_iter;
}
// Return the beginning and ending iterators as an iterator range with the correct deference type
return message_range(beginning_iter, ending_iter);
}
template<class Message>
typename MessageBuffer<Message>::stamp_range MessageBuffer<Message>::stamps() const
{
return stamp_range(
boost::make_transform_iterator(buffer_.begin(), extractStamp),
boost::make_transform_iterator(buffer_.end(), extractStamp));
}
template<class Message>
void MessageBuffer<Message>::purgeHistory()
{
// Purge any messages that are more than buffer_length_ seconds older than the most recent entry
// A setting of rclcpp::Duration::max() means "keep everything"
// And we want to keep at least two entries in buffer at all times, regardless of the stamps.
if ((buffer_length_ == rclcpp::Duration::max()) || (buffer_.size() <= 2)) {
return;
}
// Compute the expiration time carefully, as ROS can't handle negative times
const auto & ending_stamp = buffer_.back().first;
rclcpp::Time expiration_time;
if (ending_stamp.seconds() > buffer_length_.seconds()) {
expiration_time = ending_stamp - buffer_length_;
} else {
// NOTE(CH3): Uninitialized. But okay because it's just used for comparison.
expiration_time = rclcpp::Time(0, 0, ending_stamp.get_clock_type());
}
// Remove buffer elements before the expiration time.
// Be careful to ensure that:
// - at least two entries remains at all times
// - the buffer covers *at least* until the expiration time. Longer is acceptable.
auto is_greater = [](const auto & stamp, const auto & element) -> bool
{
return element.first > stamp;
};
auto expiration_iter = std::upper_bound(
buffer_.begin(),
buffer_.end(), expiration_time, is_greater);
if (expiration_iter != buffer_.begin()) {
// expiration_iter points to the first element > expiration_time.
// Back up one entry, to a point that is <= expiration_time
buffer_.erase(buffer_.begin(), std::prev(expiration_iter));
}
}
} // namespace fuse_core
#endif // FUSE_CORE__MESSAGE_BUFFER_IMPL_HPP_