sequencer_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * fkie_message_filters
4  * Copyright © 2018-2020 Fraunhofer FKIE
5  * Author: Timo Röhling
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ****************************************************************************/
20 
21 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_SEQUENCER_IMPL_H_
22 #define INCLUDE_FKIE_MESSAGE_FILTERS_SEQUENCER_IMPL_H_
23 
24 #include "sequencer.h"
26 #include "helpers/tuple.h"
27 #include <list>
28 #include <algorithm>
29 
30 namespace fkie_message_filters
31 {
32 
33 template<class... Inputs>
34 Sequencer<Inputs...>::Sequencer(const ros::Duration& max_delay) noexcept
35 : max_delay_(max_delay)
36 {
37 }
38 
39 template<class... Inputs>
40 void Sequencer<Inputs...>::set_max_delay(const ros::Duration& max_delay) noexcept
41 {
42  std::lock_guard<std::mutex> lock{mutex_};
43  max_delay_ = max_delay;
44 }
45 
46 template<class... Inputs>
47 void Sequencer<Inputs...>::receive (const Inputs&... in)
48 {
49  std::lock_guard<std::mutex> lock{mutex_};
50  ros::Time stamp = helpers::access_ros_header_stamp(std::get<0>(std::forward_as_tuple(in...)));
51  cutoff_ = std::max(cutoff_, stamp - max_delay_);
52  if (stamp < cutoff_) return;
53  queue_.emplace(stamp, QueueElement(in...));
54  auto ub = queue_.upper_bound(cutoff_);
55  std::list<QueueElement> out;
56  std::for_each(queue_.begin(), ub, [&out](auto q) { out.push_back(q.second); });
57  queue_.erase(queue_.begin(), ub);
58  /* Unlike most other filters, we do not release the mutex for send(), because we want to ensure strict temporal order. */
59  for (const QueueElement& e : out)
60  {
61  helpers::index_apply<sizeof...(Inputs)>(
62  [this, &e](auto... Is)
63  {
64  this->send(std::get<Is>(e)...);
65  }
66  );
67  }
68 }
69 
70 template<class... Inputs>
72 {
73  std::list<QueueElement> out;
74  std::for_each(queue_.begin(), queue_.end(), [&out](auto q) { out.push_back(q.second); });
75  queue_.clear();
76  /* Unlike most other filters, we do not release the mutex for send(), because we want to ensure strict temporal order. */
77  for (const QueueElement& e : out)
78  {
79  cutoff_ = helpers::access_ros_header_stamp(std::get<0>(e));
80  helpers::index_apply<sizeof...(Inputs)>(
81  [this, &e](auto... Is)
82  {
83  this->send(std::get<Is>(e)...);
84  }
85  );
86  }
87 }
88 
89 template<class... Inputs>
90 void Sequencer<Inputs...>::reset() noexcept
91 {
92  std::lock_guard<std::mutex> lock{mutex_};
93  queue_.clear();
94  cutoff_ = ros::Time();
95 }
96 
97 } // namespace fkie_message_filters
98 
99 #endif /* INCLUDE_FKIE_MESSAGE_FILTERS_SEQUENCER_IMPL_H_ */
fkie_message_filters
Definition: buffer.h:33
fkie_message_filters::Sequencer::set_max_delay
void set_max_delay(const ros::Duration &max_delay) noexcept
Modify maximum delay.
Definition: sequencer_impl.h:58
fkie_message_filters::Sequencer::Sequencer
Sequencer(const ros::Duration &max_delay=ros::Duration(1, 0)) noexcept
Constructor.
Definition: sequencer_impl.h:52
fkie_message_filters::Sequencer< Inputs... >::flush
void flush()
Flush the message queue.
Definition: sequencer_impl.h:89
fkie_message_filters::Sequencer
Enforce correct temporal order.
Definition: sequencer.h:61
sequencer.h
access_ros_header.h
fkie_message_filters::Sequencer::receive
void receive(const Inputs &... in) override
Process incoming data.
Definition: sequencer_impl.h:65
fkie_message_filters::helpers::access_ros_header_stamp
ros::Time access_ros_header_stamp(const M &m) noexcept
Definition: access_ros_header.h:108
ros::Time
fkie_message_filters::Sequencer< Inputs... >::QueueElement
std::tuple< Inputs... > QueueElement
Definition: sequencer.h:112
fkie_message_filters::helpers::index_apply
auto index_apply(Function f)
Definition: tuple.h:76
ros::Duration
tuple.h


fkie_message_filters
Author(s): Timo Röhling
autogenerated on Wed Mar 2 2022 00:18:57