20 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_BUFFER_IMPL_H_
21 #define INCLUDE_FKIE_MESSAGE_FILTERS_BUFFER_IMPL_H_
30 template<
class... Inputs>
31 struct Buffer<Inputs...>::Impl
34 : parent_(parent), policy_(policy), epoch_(0)
37 bool wait_for_queue_element(std::unique_lock<std::mutex>& lock) noexcept
39 #ifndef FKIE_MESSAGE_FILTERS_IGNORE_ROS_OK
42 cond_.wait_for(lock, std::chrono::milliseconds(100));
44 return ros::ok() && !queue_.empty();
47 return !queue_.empty();
50 template<
class Rep,
class Period>
51 bool wait_for_queue_element(std::unique_lock<std::mutex>& lock,
const std::chrono::duration<Rep, Period>& timeout) noexcept
53 std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + timeout;
54 #ifndef FKIE_MESSAGE_FILTERS_IGNORE_ROS_OK
57 std::chrono::system_clock::duration remaining = deadline - std::chrono::system_clock::now();
58 if (remaining > std::chrono::milliseconds(100))
59 cond_.wait_for(lock, std::chrono::milliseconds(100));
61 cond_.wait_until(lock, deadline);
63 return ros::ok() && !queue_.empty();
66 return !queue_.empty();
71 boost::circular_buffer<QueueElement> queue_;
73 std::condition_variable cond_;
77 template<
class... Inputs>
81 RosCB(
const std::weak_ptr<Impl>& parent, std::size_t epoch) noexcept
82 : parent_(parent), epoch_(epoch) {}
84 CallResult
call() noexcept
override
86 std::shared_ptr<Impl> impl = parent_.lock();
89 std::unique_lock<std::mutex> lock(impl->mutex_);
90 if (!impl->queue_.empty() && epoch_ == impl->epoch_)
93 impl->queue_.pop_front();
95 impl->parent_->send_queue_element(e);
101 std::weak_ptr<Impl> parent_;
105 template<
class... Inputs>
107 : impl_(std::make_shared<Impl>(
this, policy)), cbq_(cbq)
109 impl_->queue_.set_capacity(max_queue_size);
112 template<
class... Inputs>
116 impl_->queue_.set_capacity(max_queue_size);
119 template<
class... Inputs>
122 set_policy(BufferPolicy::Discard);
125 template<
class... Inputs>
128 std::unique_lock<std::mutex> lock{impl_->
mutex_};
129 impl_->policy_ = policy;
132 case BufferPolicy::Discard:
133 impl_->queue_.clear();
135 if (cbq_) cbq_->removeByID(
reinterpret_cast<uint64_t
>(
this));
138 case BufferPolicy::Queue:
139 if (max_queue_size > 0) impl_->queue_.set_capacity(max_queue_size);
142 case BufferPolicy::Passthru:
144 if (cbq_) cbq_->removeByID(
reinterpret_cast<uint64_t
>(
this));
149 impl_->cond_.notify_all();
152 template<
class... Inputs>
155 std::lock_guard<std::mutex> lock{impl_->mutex_};
159 template<
class... Inputs>
162 std::unique_lock<std::mutex> lock{impl_->mutex_};
166 template<
class... Inputs>
169 std::lock_guard<std::mutex> lock{impl_->mutex_};
170 impl_->queue_.clear();
172 if (cbq_) cbq_->removeByID(
reinterpret_cast<uint64_t
>(
this));
175 template<
class... Inputs>
178 std::vector<QueueElement> tmp;
179 tmp.reserve(impl_->queue_.size());
180 std::copy(impl_->queue_.begin(), impl_->queue_.end(), std::back_inserter(tmp));
181 impl_->queue_.clear();
185 send_queue_element(e);
190 template<
class... Inputs>
193 std::lock_guard<std::mutex> lock{impl_->mutex_};
194 return !impl_->queue_.empty();
197 template<
class... Inputs>
200 std::unique_lock<std::mutex> lock{impl_->
mutex_};
201 return impl_->wait_for_queue_element(lock);
204 template<
class... Inputs>
205 template<
class Rep,
class Period>
208 std::unique_lock<std::mutex> lock{impl_->mutex_};
209 return impl_->wait_for_queue_element(lock, timeout);
212 template<
class... Inputs>
215 std::unique_lock<std::mutex> lock{impl_->
mutex_};
216 if (impl_->wait_for_queue_element(lock))
219 impl_->queue_.pop_front();
221 send_queue_element(e);
227 template<
class... Inputs>
228 template<
class Rep,
class Period>
231 std::unique_lock<std::mutex> lock{impl_->
mutex_};
232 if (impl_->wait_for_queue_element(lock, timeout))
234 QueueElement e{impl_->queue_.front()};
235 impl_->queue_.pop_front();
237 send_queue_element(e);
243 template<
class... Inputs>
246 std::unique_lock<std::mutex> lock{impl_->mutex_};
247 switch (impl_->policy_)
249 case BufferPolicy::Discard:
251 case BufferPolicy::Queue:
253 if (cbq_) cbq_->addCallback(
ros::CallbackInterfacePtr(
new RosCB(impl_, impl_->epoch_)),
reinterpret_cast<uint64_t
>(
this));
255 impl_->cond_.notify_one();
257 case BufferPolicy::Passthru:
264 template<
class... Inputs>
267 std::unique_lock<std::mutex> lock{impl_->
mutex_};
268 while (impl_->wait_for_queue_element(lock))
270 QueueElement e{impl_->queue_.front()};
271 impl_->queue_.pop_front();
273 send_queue_element(e);
278 template<
class... Inputs>
284 this->send(std::get<is>(e)...);