exact_time_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * fkie_message_filters
4  * Copyright © 2018-2020 Fraunhofer FKIE
5  * Author: Timo Röhling
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ****************************************************************************/
20 
21 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_POLICIES_EXACT_TIME_IMPL_H_
22 #define INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_POLICIES_EXACT_TIME_IMPL_H_
23 
24 #include "exact_time.h"
25 #include "../helpers/access_ros_header.h"
26 #include "../helpers/tuple.h"
27 #include "../helpers/scoped_unlock.h"
28 #include <ros/console.h>
29 
30 namespace fkie_message_filters
31 {
32 namespace combiner_policies
33 {
34 
35 template<typename... IOs>
37 : max_age_(ros::Duration(1, 0)), max_queue_size_(0)
38 {
39 }
40 
41 template<typename... IOs>
42 ExactTime<IOs...>& ExactTime<IOs...>::set_max_age (const ros::Duration& max_age) noexcept
43 {
44  max_queue_size_ = 0;
45  max_age_ = max_age;
46  return *this;
47 }
48 
49 template<typename... IOs>
50 ExactTime<IOs...>& ExactTime<IOs...>::set_max_queue_size (std::size_t queue_size, const boost::optional<ros::Duration>& max_age) noexcept
51 {
52  max_queue_size_ = queue_size;
53  max_age_ = max_age;
54  return *this;
55 }
56 
57 template<typename... IOs>
58 template<std::size_t N>
59 void ExactTime<IOs...>::add(std::unique_lock<std::mutex>& lock, const std::tuple_element_t<N, IncomingTuples>& in)
60 {
61  ros::Time stamp = helpers::access_ros_header_stamp(std::get<0>(in));
62  if (!std::get<N>(queues_).emplace(stamp, in).second)
63  {
64  ROS_WARN_STREAM_NAMED("Combiner<ExactTime>", "message with repeating time stamp " << stamp << " is dropped");
65  return;
66  }
67  bool complete;
68  MaybeOutgoingTuples out = try_assemble_output(stamp, complete);
69  if (max_age_ || complete)
70  {
71  ros::Time cutoff = complete ? stamp : stamp - *max_age_;
72  helpers::for_each_apply<sizeof...(IOs)>(
73  [this, &cutoff](auto I)
74  {
75  auto& queue = std::get<I>(this->queues_);
76  auto ub = queue.upper_bound(cutoff);
77  queue.erase(queue.begin(), ub);
78  }
79  );
80  }
81  if (max_queue_size_ > 0)
82  {
83  auto& queue = std::get<N>(queues_);
84  if (queue.size() > max_queue_size_) queue.erase(queue.begin()); /* can be at most one element */
85  }
86  if (complete)
87  {
88  helpers::index_apply<sizeof...(IOs)>(
89  [this, &out, &lock](auto... Is)
90  {
91  auto unlock = helpers::with_scoped_unlock(lock);
92  this->emit(std::tuple_cat(*std::get<Is>(out)...));
93  }
94  );
95  }
96 }
97 
98 template<typename... IOs>
99 typename ExactTime<IOs...>::MaybeOutgoingTuples ExactTime<IOs...>::try_assemble_output(const ros::Time& time, bool& complete) noexcept
100 {
101  complete = true;
102  MaybeOutgoingTuples result;
103  helpers::for_each_apply<sizeof...(IOs)>(
104  [&](auto I)
105  {
106  if (complete)
107  {
108  auto& queue = std::get<I>(this->queues_);
109  auto it = queue.find(time);
110  if (it != queue.end())
111  {
112  std::get<I>(result) = it->second;
113  }
114  else
115  {
116  complete = false;
117  }
118  }
119  }
120  );
121  return result;
122 }
123 
124 template<typename... IOs>
126 {
127  helpers::for_each_apply<sizeof...(IOs)>(
128  [this](auto I)
129  {
130  auto& queue = std::get<I>(this->queues_);
131  queue.clear();
132  }
133  );
134 }
135 
136 } // namespace combiner_policies
137 } // namespace fkie_message_filters
138 
139 #endif /* INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_POLICIES_EXACT_TIME_IMPL_H_ */
ExactTime & set_max_queue_size(std::size_t queue_size, const boost::optional< ros::Duration > &max_age=boost::none) noexcept
Set maximum queue size.
std::tuple< boost::optional< helpers::io_tuple_t< IOs > >... > MaybeOutgoingTuples
Tuple of outgoing tuple candidates.
Definition: policy_base.h:64
void for_each_apply(Function f)
Definition: tuple.h:52
Primary namespace.
Definition: buffer.h:33
ScopedUnlock< BasicLockable > with_scoped_unlock(BasicLockable &lockable)
Definition: scoped_unlock.h:59
void reset() noexcept override
Reset internal state.
MaybeOutgoingTuples try_assemble_output(const ros::Time &time, bool &complete) noexcept
void add(std::unique_lock< std::mutex > &, const std::tuple_element_t< N, IncomingTuples > &)
Input function.
boost::optional< ros::Duration > max_age_
Definition: exact_time.h:95
auto index_apply(Function f)
Definition: tuple.h:40
ExactTime & set_max_age(const ros::Duration &max_age) noexcept
Set maximum age of any data in the queue.
ros::Time access_ros_header_stamp(const M &m) noexcept
#define ROS_WARN_STREAM_NAMED(name, args)


fkie_message_filters
Author(s): Timo Röhling
autogenerated on Mon Feb 28 2022 22:21:43