buffer_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * fkie_message_filters
4  * Copyright © 2018-2020 Fraunhofer FKIE
5  * Author: Timo Röhling
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ****************************************************************************/
20 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_BUFFER_IMPL_H_
21 #define INCLUDE_FKIE_MESSAGE_FILTERS_BUFFER_IMPL_H_
22 
23 #include "buffer.h"
24 #include "helpers/tuple.h"
25 #include <ros/init.h>
26 
27 namespace fkie_message_filters
28 {
29 
30 template<class... Inputs>
31 struct Buffer<Inputs...>::Impl
32 {
33  Impl(Buffer* parent, BufferPolicy policy) noexcept
34  : parent_(parent), policy_(policy), epoch_(0)
35  {}
36 
37  bool wait_for_queue_element(std::unique_lock<std::mutex>& lock) noexcept
38  {
39 #ifndef FKIE_MESSAGE_FILTERS_IGNORE_ROS_OK
40  while (ros::ok() && policy_ == BufferPolicy::Queue && queue_.empty())
41  {
42  cond_.wait_for(lock, std::chrono::milliseconds(100));
43  }
44  return ros::ok() && !queue_.empty();
45 #else
46  while (policy_ == BufferPolicy::Queue && queue_.empty()) cond_.wait(lock);
47  return !queue_.empty();
48 #endif
49  }
50  template<class Rep, class Period>
51  bool wait_for_queue_element(std::unique_lock<std::mutex>& lock, const std::chrono::duration<Rep, Period>& timeout) noexcept
52  {
53  std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + timeout;
54  #ifndef FKIE_MESSAGE_FILTERS_IGNORE_ROS_OK
55  while (ros::ok() && policy_ == BufferPolicy::Queue && queue_.empty())
56  {
57  std::chrono::system_clock::duration remaining = deadline - std::chrono::system_clock::now();
58  if (remaining > std::chrono::milliseconds(100))
59  cond_.wait_for(lock, std::chrono::milliseconds(100));
60  else
61  cond_.wait_until(lock, deadline);
62  }
63  return ros::ok() && !queue_.empty();
64  #else
65  while (policy_ == BufferPolicy::Queue && queue_.empty()) cond_.wait_until(lock, deadline);
66  return !queue_.empty();
67  #endif
68  }
71  boost::circular_buffer<QueueElement> queue_;
72  std::mutex mutex_;
73  std::condition_variable cond_;
74  std::size_t epoch_;
75 };
76 
77 template<class... Inputs>
78 class Buffer<Inputs...>::RosCB : public ros::CallbackInterface
79 {
80 public:
81  RosCB(const std::weak_ptr<Impl>& parent, std::size_t epoch) noexcept
82  : parent_(parent), epoch_(epoch) {}
83 
84  CallResult call() noexcept override
85  {
86  std::shared_ptr<Impl> impl = parent_.lock();
87  if (impl)
88  {
89  std::unique_lock<std::mutex> lock(impl->mutex_);
90  if (!impl->queue_.empty() && epoch_ == impl->epoch_)
91  {
92  QueueElement e{impl->queue_.front()};
93  impl->queue_.pop_front();
94  lock.unlock();
95  impl->parent_->send_queue_element(e);
96  }
97  }
98  return Success;
99  }
100 private:
101  std::weak_ptr<Impl> parent_;
102  std::size_t epoch_;
103 };
104 
105 template<class... Inputs>
106 Buffer<Inputs...>::Buffer(BufferPolicy policy, std::size_t max_queue_size, ros::CallbackQueueInterface* cbq) noexcept
107 : impl_(std::make_shared<Impl>(this, policy)), cbq_(cbq)
108 {
109  impl_->queue_.set_capacity(max_queue_size);
110 }
111 
112 template<class... Inputs>
113 Buffer<Inputs...>::Buffer(const ros::NodeHandle& nh, std::size_t max_queue_size) noexcept
114 : impl_(std::make_shared<Impl>(this, BufferPolicy::Queue)), cbq_(nh.getCallbackQueue())
115 {
116  impl_->queue_.set_capacity(max_queue_size);
117 }
118 
119 template<class... Inputs>
121 {
123 }
124 
125 template<class... Inputs>
126 void Buffer<Inputs...>::set_policy(BufferPolicy policy, std::size_t max_queue_size)
127 {
128  std::unique_lock<std::mutex> lock{impl_->mutex_};
129  impl_->policy_ = policy;
130  switch (policy)
131  {
133  impl_->queue_.clear();
134  ++impl_->epoch_;
135  if (cbq_) cbq_->removeByID(reinterpret_cast<uint64_t>(this));
136  lock.unlock();
137  break;
138  case BufferPolicy::Queue:
139  if (max_queue_size > 0) impl_->queue_.set_capacity(max_queue_size);
140  lock.unlock();
141  break;
143  ++impl_->epoch_;
144  if (cbq_) cbq_->removeByID(reinterpret_cast<uint64_t>(this));
145  process_some(lock);
146  // lock is unlocked now
147  break;
148  }
149  impl_->cond_.notify_all();
150 }
151 
152 template<class... Inputs>
154 {
155  std::lock_guard<std::mutex> lock{impl_->mutex_};
156  cbq_ = cbq;
157 }
158 
159 template<class... Inputs>
161 {
162  std::unique_lock<std::mutex> lock{impl_->mutex_};
163  process_some(lock);
164 }
165 
166 template<class... Inputs>
168 {
169  std::lock_guard<std::mutex> lock{impl_->mutex_};
170  impl_->queue_.clear();
171  ++impl_->epoch_;
172  if (cbq_) cbq_->removeByID(reinterpret_cast<uint64_t>(this));
173 }
174 
175 template<class... Inputs>
176 void Buffer<Inputs...>::process_some(std::unique_lock<std::mutex>& lock)
177 {
178  std::vector<QueueElement> tmp;
179  tmp.reserve(impl_->queue_.size());
180  std::copy(impl_->queue_.begin(), impl_->queue_.end(), std::back_inserter(tmp));
181  impl_->queue_.clear();
182  lock.unlock();
183  for (const QueueElement& e : tmp)
184  {
186  }
187  // lock stays unlocked
188 }
189 
190 template<class... Inputs>
191 bool Buffer<Inputs...>::has_some() const noexcept
192 {
193  std::lock_guard<std::mutex> lock{impl_->mutex_};
194  return !impl_->queue_.empty();
195 }
196 
197 template<class... Inputs>
198 bool Buffer<Inputs...>::wait() noexcept
199 {
200  std::unique_lock<std::mutex> lock{impl_->mutex_};
201  return impl_->wait_for_queue_element(lock);
202 }
203 
204 template<class... Inputs>
205 template<class Rep, class Period>
206 bool Buffer<Inputs...>::wait_for(const std::chrono::duration<Rep, Period>& timeout) noexcept
207 {
208  std::unique_lock<std::mutex> lock{impl_->mutex_};
209  return impl_->wait_for_queue_element(lock, timeout);
210 }
211 
212 template<class... Inputs>
214 {
215  std::unique_lock<std::mutex> lock{impl_->mutex_};
216  if (impl_->wait_for_queue_element(lock))
217  {
218  QueueElement e{impl_->queue_.front()};
219  impl_->queue_.pop_front();
220  lock.unlock();
222  return true;
223  }
224  return false;
225 }
226 
227 template<class... Inputs>
228 template<class Rep, class Period>
229 bool Buffer<Inputs...>::process_one(const std::chrono::duration<Rep, Period>& timeout)
230 {
231  std::unique_lock<std::mutex> lock{impl_->mutex_};
232  if (impl_->wait_for_queue_element(lock, timeout))
233  {
234  QueueElement e{impl_->queue_.front()};
235  impl_->queue_.pop_front();
236  lock.unlock();
238  return true;
239  }
240  return false;
241 }
242 
243 template<class... Inputs>
244 void Buffer<Inputs...>::receive(const Inputs&... in)
245 {
246  std::unique_lock<std::mutex> lock{impl_->mutex_};
247  switch (impl_->policy_)
248  {
250  break;
251  case BufferPolicy::Queue:
252  impl_->queue_.push_back(QueueElement(in...));
253  if (cbq_) cbq_->addCallback(ros::CallbackInterfacePtr(new RosCB(impl_, impl_->epoch_)), reinterpret_cast<uint64_t>(this));
254  lock.unlock();
255  impl_->cond_.notify_one();
256  break;
258  lock.unlock();
259  this->send(in...);
260  break;
261  }
262 }
263 
264 template<class... Inputs>
266 {
267  std::unique_lock<std::mutex> lock{impl_->mutex_};
268  while (impl_->wait_for_queue_element(lock))
269  {
270  QueueElement e{impl_->queue_.front()};
271  impl_->queue_.pop_front();
272  lock.unlock();
274  lock.lock();
275  }
276 }
277 
278 template<class... Inputs>
280 {
281  helpers::index_apply<sizeof...(Inputs)>(
282  [&](auto... is)
283  {
284  this->send(std::get<is>(e)...);
285  }
286  );
287 }
288 
289 } // namespace fkie_message_filters
290 
291 #endif /* INCLUDE_FKIE_MESSAGE_FILTERS_BUFFER_IMPL_H_ */
void spin()
Process all data indefinitely.
Definition: buffer_impl.h:265
bool wait() noexcept
Wait for pending data.
Definition: buffer_impl.h:198
CallResult call() noexcept override
Definition: buffer_impl.h:84
void reset() noexcept override
Reset filter.
Definition: buffer_impl.h:167
Store and forward data.
Definition: buffer.h:103
bool wait_for_queue_element(std::unique_lock< std::mutex > &lock, const std::chrono::duration< Rep, Period > &timeout) noexcept
Definition: buffer_impl.h:51
bool wait_for_queue_element(std::unique_lock< std::mutex > &lock) noexcept
Definition: buffer_impl.h:37
Impl(Buffer *parent, BufferPolicy policy) noexcept
Definition: buffer_impl.h:33
std::condition_variable cond_
Definition: buffer_impl.h:73
virtual void removeByID(uint64_t owner_id)=0
void spin_once()
Process pending data.
Definition: buffer_impl.h:160
bool process_one()
Wait for and process one data item.
Definition: buffer_impl.h:213
ros::CallbackQueueInterface * cbq_
Definition: buffer.h:253
std::weak_ptr< Impl > parent_
Definition: buffer_impl.h:101
Buffer(const ros::NodeHandle &nh, std::size_t max_queue_size) noexcept
Constructor.
Definition: buffer_impl.h:113
BufferPolicy
Buffer policy.
Definition: buffer.h:40
void set_callback_queue(ros::CallbackQueueInterface *cbq) noexcept
Process data with a ROS callback queue.
Definition: buffer_impl.h:153
void send_queue_element(const QueueElement &e)
Definition: buffer_impl.h:279
bool has_some() const noexcept
Check if the buffer has pending data.
Definition: buffer_impl.h:191
ROSCPP_DECL bool ok()
Primary namespace.
Definition: buffer.h:33
bool wait_for(const std::chrono::duration< Rep, Period > &timeout) noexcept
Wait for pending data until timeout expires.
Definition: buffer_impl.h:206
virtual void addCallback(const CallbackInterfacePtr &callback, uint64_t owner_id=0)=0
void send(const Outputs &... out)
Pass data to all connected sinks.
Definition: source_impl.h:51
RosCB(const std::weak_ptr< Impl > &parent, std::size_t epoch) noexcept
Definition: buffer_impl.h:81
virtual void receive(const Inputs &... in) override
Process incoming data.
Definition: buffer_impl.h:244
void set_policy(BufferPolicy policy, std::size_t max_queue_size=0)
Modify the buffer policy.
Definition: buffer_impl.h:126
std::shared_ptr< Impl > impl_
Definition: buffer.h:252
boost::circular_buffer< QueueElement > queue_
Definition: buffer_impl.h:71
std::tuple< Inputs... > QueueElement
Definition: buffer.h:249
auto index_apply(Function f)
Definition: tuple.h:40
void process_some(std::unique_lock< std::mutex > &)
Definition: buffer_impl.h:176


fkie_message_filters
Author(s): Timo Röhling
autogenerated on Mon Feb 28 2022 22:21:43