21 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_POLICIES_APPROXIMATE_TIME_IMPL_H_
22 #define INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_POLICIES_APPROXIMATE_TIME_IMPL_H_
25 #include "../helpers/access_ros_header.h"
26 #include "../helpers/scoped_unlock.h"
27 #include "../helpers/tuple.h"
32 namespace combiner_policies
35 template<
typename... IOs>
37 : max_age_(
ros::Duration(1, 0)), max_queue_size_(0), max_delta_(
boost::none), pivot_(UNSET)
41 template<
typename... IOs>
49 template<
typename... IOs>
52 max_queue_size_ = queue_size;
57 template<
typename... IOs>
60 max_delta_ = max_delta;
64 template<
typename... IOs>
67 min_dist_[i] = min_dist;
71 template<
typename... IOs>
72 template<std::
size_t N>
79 discard_expired(cutoff);
81 auto& head = std::get<N>(heads_);
82 auto& queue = std::get<N>(queues_);
89 ROS_ERROR_STREAM_NAMED(
"Combiner<ApproximateTime>",
"message with older time stamp " << stamp <<
"<" << latest <<
" received, resetting filter");
93 if (latest + min_dist_[N] > stamp)
95 ROS_WARN_STREAM_NAMED(
"Combiner<ApproximateTime>",
"new message arrived sooner than anticipated: time stamp " << stamp <<
"<" << latest + min_dist_[N]);
99 if (!head) head = in;
else queue.push_back(in);
101 if (max_queue_size_ > 0)
103 prune_queue_at<N>(max_queue_size_);
112 if (!determine_pivot())
return;
119 if (can_still_improve_at<N>())
return;
122 if (can_still_improve())
return;
127 if (heads_timespan() > *max_delta_)
137 template<
typename... IOs>
140 MaybeOutgoingTuples out = heads_;
141 helpers::for_each_apply<NUM_SLOTS>(
144 auto& head = std::get<I>(this->heads_);
145 auto& queue = std::get<I>(this->queues_);
148 head = queue.front();
159 helpers::index_apply<NUM_SLOTS>(
162 this->emit(std::tuple_cat(*std::get<Is>(out)...));
167 template<
typename... IOs>
168 template<std::
size_t N>
172 auto& head = std::get<N>(heads_);
173 auto& queue = std::get<N>(queues_);
175 while (!queue.empty())
178 if (pivot_timedelta(next_stamp) < pivot_timedelta(stamp))
181 head = queue.front();
193 return stamp < pivot_ts_ && pivot_timedelta(stamp + min_dist_[N]) <= pivot_timedelta(stamp);
196 template<
typename... IOs>
200 helpers::for_each_apply<NUM_SLOTS>(
205 if (this->can_still_improve_at<I>()) result =
true;
212 template<
typename... IOs>
213 template<std::
size_t N>
216 auto& head = std::get<N>(heads_);
217 auto& queue = std::get<N>(queues_);
227 while (!queue.empty())
230 if (stamp <= cutoff) queue.pop_front();
else break;
232 if (!head && !queue.empty())
234 head = queue.front();
239 template<
typename... IOs>
242 helpers::for_each_apply<NUM_SLOTS>([
this, cutoff](
auto I) { this->discard_expired_at<I>(cutoff); });
245 template<
typename... IOs>
246 template<std::
size_t N>
249 auto& queue = std::get<N>(queues_);
250 if (queue.size() <= queue_size)
return;
251 while (queue.size() > queue_size + 1) queue.pop_front();
252 auto& head = std::get<N>(heads_);
253 head = queue.front();
258 template<
typename... IOs>
261 helpers::for_each_apply<NUM_SLOTS>(
264 std::get<I>(heads_).reset();
265 std::get<I>(queues_).clear();
271 template<
typename... IOs>
275 helpers::for_each_apply<NUM_SLOTS>(
278 auto& head = std::get<I>(this->heads_);
282 if (first_ts.
isZero() || stamp < first_ts)
286 if (last_ts.
isZero() || stamp > last_ts)
293 return last_ts - first_ts;
296 template<
typename... IOs>
300 return ts < pivot_ts_ ? pivot_ts_ - ts : ts - pivot_ts_;
303 template<
typename... IOs>
306 helpers::select_apply<NUM_SLOTS>(pivot_,
309 auto& head = std::get<I>(this->heads_);
310 auto& queue = std::get<I>(this->queues_);
313 head = queue.front();
323 template<
typename... IOs>
326 assert(pivot_ == UNSET);
329 helpers::for_each_apply<NUM_SLOTS>(
332 auto& head = std::get<I>(this->heads_);
336 if (stamp > pivot_ts_)
349 return pivot_ != UNSET;