20 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_BUFFER_IMPL_H_ 21 #define INCLUDE_FKIE_MESSAGE_FILTERS_BUFFER_IMPL_H_ 30 template<
class... Inputs>
34 : parent_(parent), policy_(policy), epoch_(0)
39 #ifndef FKIE_MESSAGE_FILTERS_IGNORE_ROS_OK 42 cond_.wait_for(lock, std::chrono::milliseconds(100));
44 return ros::ok() && !queue_.empty();
47 return !queue_.empty();
50 template<
class Rep,
class Period>
51 bool wait_for_queue_element(std::unique_lock<std::mutex>& lock,
const std::chrono::duration<Rep, Period>& timeout) noexcept
53 std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + timeout;
54 #ifndef FKIE_MESSAGE_FILTERS_IGNORE_ROS_OK 57 std::chrono::system_clock::duration remaining = deadline - std::chrono::system_clock::now();
58 if (remaining > std::chrono::milliseconds(100))
59 cond_.wait_for(lock, std::chrono::milliseconds(100));
61 cond_.wait_until(lock, deadline);
63 return ros::ok() && !queue_.empty();
66 return !queue_.empty();
71 boost::circular_buffer<QueueElement>
queue_;
77 template<
class... Inputs>
81 RosCB(
const std::weak_ptr<Impl>& parent, std::size_t epoch) noexcept
82 : parent_(parent), epoch_(epoch) {}
86 std::shared_ptr<Impl> impl = parent_.lock();
89 std::unique_lock<std::mutex> lock(impl->mutex_);
90 if (!impl->queue_.empty() && epoch_ == impl->epoch_)
93 impl->queue_.pop_front();
95 impl->parent_->send_queue_element(e);
105 template<
class... Inputs>
107 :
impl_(std::make_shared<Impl>(
this, policy)),
cbq_(cbq)
109 impl_->queue_.set_capacity(max_queue_size);
112 template<
class... Inputs>
116 impl_->queue_.set_capacity(max_queue_size);
119 template<
class... Inputs>
125 template<
class... Inputs>
128 std::unique_lock<std::mutex> lock{
impl_->mutex_};
129 impl_->policy_ = policy;
133 impl_->queue_.clear();
139 if (max_queue_size > 0)
impl_->queue_.set_capacity(max_queue_size);
149 impl_->cond_.notify_all();
152 template<
class... Inputs>
155 std::lock_guard<std::mutex> lock{
impl_->mutex_};
159 template<
class... Inputs>
162 std::unique_lock<std::mutex> lock{
impl_->mutex_};
166 template<
class... Inputs>
169 std::lock_guard<std::mutex> lock{
impl_->mutex_};
170 impl_->queue_.clear();
175 template<
class... Inputs>
178 std::vector<QueueElement> tmp;
179 tmp.reserve(
impl_->queue_.size());
180 std::copy(
impl_->queue_.begin(),
impl_->queue_.end(), std::back_inserter(tmp));
181 impl_->queue_.clear();
190 template<
class... Inputs>
193 std::lock_guard<std::mutex> lock{
impl_->mutex_};
194 return !
impl_->queue_.empty();
197 template<
class... Inputs>
200 std::unique_lock<std::mutex> lock{
impl_->mutex_};
201 return impl_->wait_for_queue_element(lock);
204 template<
class... Inputs>
205 template<
class Rep,
class Period>
208 std::unique_lock<std::mutex> lock{
impl_->mutex_};
209 return impl_->wait_for_queue_element(lock, timeout);
212 template<
class... Inputs>
215 std::unique_lock<std::mutex> lock{
impl_->mutex_};
216 if (
impl_->wait_for_queue_element(lock))
219 impl_->queue_.pop_front();
227 template<
class... Inputs>
228 template<
class Rep,
class Period>
231 std::unique_lock<std::mutex> lock{
impl_->mutex_};
232 if (
impl_->wait_for_queue_element(lock, timeout))
235 impl_->queue_.pop_front();
243 template<
class... Inputs>
246 std::unique_lock<std::mutex> lock{
impl_->mutex_};
247 switch (
impl_->policy_)
255 impl_->cond_.notify_one();
264 template<
class... Inputs>
267 std::unique_lock<std::mutex> lock{
impl_->mutex_};
268 while (
impl_->wait_for_queue_element(lock))
271 impl_->queue_.pop_front();
278 template<
class... Inputs>
284 this->
send(std::get<is>(e)...);
void spin()
Process all data indefinitely.
bool wait() noexcept
Wait for pending data.
CallResult call() noexcept override
void reset() noexcept override
Reset filter.
bool wait_for_queue_element(std::unique_lock< std::mutex > &lock, const std::chrono::duration< Rep, Period > &timeout) noexcept
bool wait_for_queue_element(std::unique_lock< std::mutex > &lock) noexcept
Forward data immediately.
Impl(Buffer *parent, BufferPolicy policy) noexcept
std::condition_variable cond_
virtual void removeByID(uint64_t owner_id)=0
void spin_once()
Process pending data.
bool process_one()
Wait for and process one data item.
ros::CallbackQueueInterface * cbq_
std::weak_ptr< Impl > parent_
Buffer(const ros::NodeHandle &nh, std::size_t max_queue_size) noexcept
Constructor.
BufferPolicy
Buffer policy.
void set_callback_queue(ros::CallbackQueueInterface *cbq) noexcept
Process data with a ROS callback queue.
void send_queue_element(const QueueElement &e)
bool has_some() const noexcept
Check if the buffer has pending data.
bool wait_for(const std::chrono::duration< Rep, Period > &timeout) noexcept
Wait for pending data until timeout expires.
virtual void addCallback(const CallbackInterfacePtr &callback, uint64_t owner_id=0)=0
void send(const Outputs &... out)
Pass data to all connected sinks.
RosCB(const std::weak_ptr< Impl > &parent, std::size_t epoch) noexcept
virtual void receive(const Inputs &... in) override
Process incoming data.
void set_policy(BufferPolicy policy, std::size_t max_queue_size=0)
Modify the buffer policy.
std::shared_ptr< Impl > impl_
boost::circular_buffer< QueueElement > queue_
std::tuple< Inputs... > QueueElement
auto index_apply(Function f)
void process_some(std::unique_lock< std::mutex > &)