approximate_time_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * fkie_message_filters
4  * Copyright © 2018-2020 Fraunhofer FKIE
5  * Author: Timo Röhling
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ****************************************************************************/
20 
21 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_POLICIES_APPROXIMATE_TIME_IMPL_H_
22 #define INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_POLICIES_APPROXIMATE_TIME_IMPL_H_
23 
24 #include "approximate_time.h"
25 #include "../helpers/access_ros_header.h"
26 #include "../helpers/scoped_unlock.h"
27 #include "../helpers/tuple.h"
28 #include <ros/console.h>
29 
30 namespace fkie_message_filters
31 {
32 namespace combiner_policies
33 {
34 
35 template<typename... IOs>
37 : max_age_(ros::Duration(1, 0)), max_queue_size_(0), max_delta_(boost::none), pivot_(UNSET)
38 {
39 }
40 
41 template<typename... IOs>
43 {
44  max_queue_size_ = 0;
45  max_age_ = max_age;
46  return *this;
47 }
48 
49 template<typename... IOs>
50 ApproximateTime<IOs...>& ApproximateTime<IOs...>::set_max_queue_size(std::size_t queue_size, const boost::optional<ros::Duration>& max_age) noexcept
51 {
52  max_queue_size_ = queue_size;
53  max_age_ = max_age;
54  return *this;
55 }
56 
57 template<typename... IOs>
59 {
60  max_delta_ = max_delta;
61  return *this;
62 }
63 
64 template<typename... IOs>
65 ApproximateTime<IOs...>& ApproximateTime<IOs...>::set_min_distance(std::size_t i, const ros::Duration& min_dist) noexcept
66 {
67  min_dist_[i] = min_dist;
68  return *this;
69 }
70 
71 template<typename... IOs>
72 template<std::size_t N>
73 void ApproximateTime<IOs...>::add(std::unique_lock<std::mutex>& lock, const std::tuple_element_t<N, IncomingTuples>& in)
74 {
75  ros::Time stamp = helpers::access_ros_header_stamp(std::get<0>(in));
76  if (max_age_)
77  {
78  ros::Time cutoff = stamp - *max_age_;
79  discard_expired(cutoff);
80  }
81  auto& head = std::get<N>(heads_);
82  auto& queue = std::get<N>(queues_);
83  /* First, make sure that all slots have in-order arrival of messages */
84  if (head)
85  {
86  ros::Time latest = helpers::access_ros_header_stamp(std::get<0>(queue.empty() ? *head : queue.back()));
87  if (stamp < latest)
88  {
89  ROS_ERROR_STREAM_NAMED("Combiner<ApproximateTime>", "message with older time stamp " << stamp << "<" << latest << " received, resetting filter");
90  reset();
91  }
92  else
93  if (latest + min_dist_[N] > stamp)
94  {
95  ROS_WARN_STREAM_NAMED("Combiner<ApproximateTime>", "new message arrived sooner than anticipated: time stamp " << stamp << "<" << latest + min_dist_[N]);
96  }
97  }
98  /* Add data to slot */
99  if (!head) head = in; else queue.push_back(in);
100  /* Enforce queue size limit */
101  if (max_queue_size_ > 0)
102  {
103  prune_queue_at<N>(max_queue_size_);
104  }
105  while (true)
106  {
107  /* The pivot is the last head slot that fills after the previous set was emitted.
108  * The pivot will always be a member of the next emitted set unless the timespan constraint is
109  * violated. */
110  if (pivot_ == UNSET)
111  {
112  if (!determine_pivot()) return; /* Head is still incomplete, thus pivot undetermined */
113  }
114  if (pivot_ != N)
115  {
116  /* The pivot slot will never advance, because it must be part of the next set.
117  * For all other slots, we try to improve and see if we reached an optimum.
118  * If we could still improve with a later message, we stop here for now. */
119  if (can_still_improve_at<N>()) return;
120  }
121  /* The current slot cannot improve, but maybe some other slot can */
122  if (can_still_improve()) return;
123  /* We have reached the best possible set with the current pivot */
124  if (max_delta_)
125  {
126  /* Optionally, check if the total timespan of the current set is acceptable */
127  if (heads_timespan() > *max_delta_)
128  {
129  drop_pivot(); /* This pivot is not good enough, maybe the next one? */
130  continue;
131  }
132  }
133  emit_heads(lock);
134  }
135 }
136 
137 template<typename... IOs>
138 void ApproximateTime<IOs...>::emit_heads(std::unique_lock<std::mutex>& lock)
139 {
141  helpers::for_each_apply<NUM_SLOTS>(
142  [this](auto I)
143  {
144  auto& head = std::get<I>(this->heads_);
145  auto& queue = std::get<I>(this->queues_);
146  if (!queue.empty())
147  {
148  head = queue.front();
149  queue.pop_front();
150  }
151  else
152  {
153  head.reset();
154  }
155  }
156  );
157  pivot_ = UNSET;
158  auto unlock = helpers::with_scoped_unlock(lock);
159  helpers::index_apply<NUM_SLOTS>(
160  [&](auto... Is)
161  {
162  this->emit(std::tuple_cat(*std::get<Is>(out)...));
163  }
164  );
165 }
166 
167 template<typename... IOs>
168 template<std::size_t N>
170 {
171  // Check we can improve the current set by advancing in slot N
172  auto& head = std::get<N>(heads_);
173  auto& queue = std::get<N>(queues_);
174  ros::Time stamp = helpers::access_ros_header_stamp(std::get<0>(*head));
175  while (!queue.empty())
176  {
177  ros::Time next_stamp = helpers::access_ros_header_stamp(std::get<0>(queue.front()));
178  if (pivot_timedelta(next_stamp) < pivot_timedelta(stamp))
179  {
180  /* The next message is closer to the pivot element */
181  head = queue.front();
182  stamp = next_stamp;
183  queue.pop_front();
184  }
185  else
186  return false; /* cannot improve further with this pivot */
187  }
188  /* We ran out of messages in this queue, so we do not know for sure whether or not the next
189  * message would be a better fit. However, we can only improve if the current message is still
190  * earlier than the pivot and if the next message could arrive early enough to be closer to the
191  * pivot than the current one.
192  */
193  return stamp < pivot_ts_ && pivot_timedelta(stamp + min_dist_[N]) <= pivot_timedelta(stamp);
194 }
195 
196 template<typename... IOs>
198 {
199  bool result = false;
200  helpers::for_each_apply<NUM_SLOTS>(
201  [&](auto I)
202  {
203  if (I != pivot_)
204  {
205  if (this->can_still_improve_at<I>()) result = true;
206  }
207  }
208  );
209  return result;
210 }
211 
212 template<typename... IOs>
213 template<std::size_t N>
215 {
216  auto& head = std::get<N>(heads_);
217  auto& queue = std::get<N>(queues_);
218  if (head)
219  {
220  ros::Time stamp = helpers::access_ros_header_stamp(std::get<0>(*head));
221  if (stamp < cutoff)
222  {
223  head.reset();
224  pivot_ = UNSET;
225  }
226  }
227  while (!queue.empty())
228  {
229  ros::Time stamp = helpers::access_ros_header_stamp(std::get<0>(queue.front()));
230  if (stamp <= cutoff) queue.pop_front(); else break;
231  }
232  if (!head && !queue.empty())
233  {
234  head = queue.front();
235  queue.pop_front();
236  }
237 }
238 
239 template<typename... IOs>
241 {
242  helpers::for_each_apply<NUM_SLOTS>([this, cutoff](auto I) { this->discard_expired_at<I>(cutoff); });
243 }
244 
245 template<typename... IOs>
246 template<std::size_t N>
247 void ApproximateTime<IOs...>::prune_queue_at(std::size_t queue_size) noexcept
248 {
249  auto& queue = std::get<N>(queues_);
250  if (queue.size() <= queue_size) return;
251  while (queue.size() > queue_size + 1) queue.pop_front();
252  auto& head = std::get<N>(heads_);
253  head = queue.front();
254  queue.pop_front();
255  pivot_ = UNSET;
256 }
257 
258 template<typename... IOs>
260 {
261  helpers::for_each_apply<NUM_SLOTS>(
262  [this](auto I)
263  {
264  std::get<I>(heads_).reset();
265  std::get<I>(queues_).clear();
266  }
267  );
268  pivot_ = UNSET;
269 }
270 
271 template<typename... IOs>
273 {
274  ros::Time first_ts, last_ts;
275  helpers::for_each_apply<NUM_SLOTS>(
276  [&](auto I)
277  {
278  auto& head = std::get<I>(this->heads_);
279  if (head)
280  {
281  ros::Time stamp = helpers::access_ros_header_stamp(std::get<0>(*head));
282  if (first_ts.isZero() || stamp < first_ts)
283  {
284  first_ts = stamp;
285  }
286  if (last_ts.isZero() || stamp > last_ts)
287  {
288  last_ts = stamp;
289  }
290  }
291  }
292  );
293  return last_ts - first_ts;
294 }
295 
296 template<typename... IOs>
298 {
299  /* The pivot timedelta is the absolute temporal distance from the pivot. */
300  return ts < pivot_ts_ ? pivot_ts_ - ts : ts - pivot_ts_;
301 }
302 
303 template<typename... IOs>
305 {
306  helpers::select_apply<NUM_SLOTS>(pivot_,
307  [this](auto I)
308  {
309  auto& head = std::get<I>(this->heads_);
310  auto& queue = std::get<I>(this->queues_);
311  if (!queue.empty())
312  {
313  head = queue.front();
314  queue.pop_front();
315  }
316  else
317  head.reset();
318  }
319  );
320  pivot_ = UNSET;
321 }
322 
323 template<typename... IOs>
325 {
326  assert(pivot_ == UNSET);
327  pivot_ts_ = ros::Time();
328  bool ok = true;
329  helpers::for_each_apply<NUM_SLOTS>(
330  [&](auto I)
331  {
332  auto& head = std::get<I>(this->heads_);
333  if (head)
334  {
335  ros::Time stamp = helpers::access_ros_header_stamp(std::get<0>(*head));
336  if (stamp > pivot_ts_)
337  {
338  pivot_ts_ = stamp;
339  if (ok) pivot_ = I;
340  }
341  }
342  else
343  {
344  pivot_ = UNSET;
345  ok = false;
346  }
347  }
348  );
349  return pivot_ != UNSET;
350 }
351 
352 } // namespace combiner_policies
353 } // namespace fkie_message_filters
354 
355 #endif /* INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_POLICIES_APPROXIMATE_TIME_IMPL_H_ */
void discard_expired_at(const ros::Time &cutoff) noexcept
#define ROS_ERROR_STREAM_NAMED(name, args)
ApproximateTime & set_min_distance(std::size_t i, const ros::Duration &min_dist) noexcept
Set the minimum distance between consecutive messages on a source.
ApproximateTime & set_max_queue_size(std::size_t queue_size, const boost::optional< ros::Duration > &max_age=boost::none) noexcept
Set maximum queue size.
void add(std::unique_lock< std::mutex > &, const std::tuple_element_t< N, IncomingTuples > &)
Input function.
ApproximateTime & set_max_age(const ros::Duration &max_age) noexcept
Set maximum age of any data in the queue.
std::array< ros::Duration, NUM_SLOTS > min_dist_
ApproximateTime & set_max_timespan(const ros::Duration &max_delta) noexcept
Set maximum permissible timestamp difference of matched messages.
std::tuple< boost::optional< helpers::io_tuple_t< IOs > >... > MaybeOutgoingTuples
Tuple of outgoing tuple candidates.
Definition: policy_base.h:64
ros::Duration pivot_timedelta(const ros::Time &ts) noexcept
ROSCPP_DECL bool ok()
void reset() noexcept override
Reset internal state.
Primary namespace.
Definition: buffer.h:33
ScopedUnlock< BasicLockable > with_scoped_unlock(BasicLockable &lockable)
Definition: scoped_unlock.h:59
ros::Time access_ros_header_stamp(const M &m) noexcept
#define ROS_WARN_STREAM_NAMED(name, args)


fkie_message_filters
Author(s): Timo Röhling
autogenerated on Mon Feb 28 2022 22:21:43