21 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_POLICIES_APPROXIMATE_TIME_IMPL_H_ 22 #define INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_POLICIES_APPROXIMATE_TIME_IMPL_H_ 25 #include "../helpers/access_ros_header.h" 26 #include "../helpers/scoped_unlock.h" 27 #include "../helpers/tuple.h" 32 namespace combiner_policies
35 template<
typename... IOs>
37 : max_age_(
ros::Duration(1, 0)), max_queue_size_(0), max_delta_(
boost::none), pivot_(UNSET)
41 template<
typename... IOs>
49 template<
typename... IOs>
57 template<
typename... IOs>
64 template<
typename... IOs>
71 template<
typename... IOs>
72 template<std::
size_t N>
81 auto& head = std::get<N>(
heads_);
82 auto& queue = std::get<N>(
queues_);
89 ROS_ERROR_STREAM_NAMED(
"Combiner<ApproximateTime>",
"message with older time stamp " << stamp <<
"<" << latest <<
" received, resetting filter");
95 ROS_WARN_STREAM_NAMED(
"Combiner<ApproximateTime>",
"new message arrived sooner than anticipated: time stamp " << stamp <<
"<" << latest +
min_dist_[N]);
99 if (!head) head = in;
else queue.push_back(in);
119 if (can_still_improve_at<N>())
return;
137 template<
typename... IOs>
141 helpers::for_each_apply<NUM_SLOTS>(
144 auto& head = std::get<I>(this->
heads_);
145 auto& queue = std::get<I>(this->
queues_);
148 head = queue.front();
159 helpers::index_apply<NUM_SLOTS>(
162 this->
emit(std::tuple_cat(*std::get<Is>(out)...));
167 template<
typename... IOs>
168 template<std::
size_t N>
172 auto& head = std::get<N>(
heads_);
173 auto& queue = std::get<N>(
queues_);
175 while (!queue.empty())
181 head = queue.front();
196 template<
typename... IOs>
200 helpers::for_each_apply<NUM_SLOTS>(
205 if (this->can_still_improve_at<I>()) result =
true;
212 template<
typename... IOs>
213 template<std::
size_t N>
216 auto& head = std::get<N>(
heads_);
217 auto& queue = std::get<N>(
queues_);
227 while (!queue.empty())
230 if (stamp <= cutoff) queue.pop_front();
else break;
232 if (!head && !queue.empty())
234 head = queue.front();
239 template<
typename... IOs>
242 helpers::for_each_apply<NUM_SLOTS>([
this, cutoff](
auto I) { this->discard_expired_at<I>(cutoff); });
245 template<
typename... IOs>
246 template<std::
size_t N>
249 auto& queue = std::get<N>(
queues_);
250 if (queue.size() <= queue_size)
return;
251 while (queue.size() > queue_size + 1) queue.pop_front();
252 auto& head = std::get<N>(
heads_);
253 head = queue.front();
258 template<
typename... IOs>
261 helpers::for_each_apply<NUM_SLOTS>(
271 template<
typename... IOs>
275 helpers::for_each_apply<NUM_SLOTS>(
278 auto& head = std::get<I>(this->
heads_);
282 if (first_ts.
isZero() || stamp < first_ts)
286 if (last_ts.
isZero() || stamp > last_ts)
293 return last_ts - first_ts;
296 template<
typename... IOs>
303 template<
typename... IOs>
306 helpers::select_apply<NUM_SLOTS>(
pivot_,
309 auto& head = std::get<I>(this->
heads_);
310 auto& queue = std::get<I>(this->
queues_);
313 head = queue.front();
323 template<
typename... IOs>
329 helpers::for_each_apply<NUM_SLOTS>(
332 auto& head = std::get<I>(this->
heads_);
void prune_queue_at(std::size_t queue_size) noexcept
std::size_t max_queue_size_
void discard_expired_at(const ros::Time &cutoff) noexcept
#define ROS_ERROR_STREAM_NAMED(name, args)
bool can_still_improve_at() noexcept
boost::optional< ros::Duration > max_age_
void drop_pivot() noexcept
ApproximateTime & set_min_distance(std::size_t i, const ros::Duration &min_dist) noexcept
Set the minimum distance between consecutive messages on a source.
ApproximateTime & set_max_queue_size(std::size_t queue_size, const boost::optional< ros::Duration > &max_age=boost::none) noexcept
Set maximum queue size.
boost::optional< ros::Duration > max_delta_
void add(std::unique_lock< std::mutex > &, const std::tuple_element_t< N, IncomingTuples > &)
Input function.
ApproximateTime & set_max_age(const ros::Duration &max_age) noexcept
Set maximum age of any data in the queue.
static constexpr std::size_t UNSET
std::array< ros::Duration, NUM_SLOTS > min_dist_
ApproximateTime & set_max_timespan(const ros::Duration &max_delta) noexcept
Set maximum permissible timestamp difference of matched messages.
ros::Duration heads_timespan() noexcept
bool determine_pivot() noexcept
ApproximateTime()
Constructor.
std::tuple< boost::optional< helpers::io_tuple_t< IOs > >... > MaybeOutgoingTuples
Tuple of outgoing tuple candidates.
ros::Duration pivot_timedelta(const ros::Time &ts) noexcept
void reset() noexcept override
Reset internal state.
ScopedUnlock< BasicLockable > with_scoped_unlock(BasicLockable &lockable)
void discard_expired(const ros::Time &cutoff) noexcept
MaybeOutgoingTuples heads_
void emit(const OutgoingTuple &out)
Emit data.
bool can_still_improve() noexcept
void emit_heads(std::unique_lock< std::mutex > &)
ros::Time access_ros_header_stamp(const M &m) noexcept
#define ROS_WARN_STREAM_NAMED(name, args)