Classes | Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
fkie_message_filters::Buffer< Inputs > Class Template Reference

Store and forward data. More...

#include <buffer.h>

Inheritance diagram for fkie_message_filters::Buffer< Inputs >:
Inheritance graph
[legend]

Classes

struct  Impl
 
class  RosCB
 

Public Member Functions

 Buffer (BufferPolicy policy=BufferPolicy::Discard, std::size_t max_queue_size=1, ros::CallbackQueueInterface *cbq=nullptr) noexcept
 Constructor. More...
 
 Buffer (const ros::NodeHandle &nh, std::size_t max_queue_size) noexcept
 Constructor. More...
 
bool has_some () const noexcept
 Check if the buffer has pending data. More...
 
bool process_one ()
 Wait for and process one data item. More...
 
template<class Rep , class Period >
bool process_one (const std::chrono::duration< Rep, Period > &timeout)
 Wait for and process one data item. More...
 
void reset () noexcept override
 Reset filter. More...
 
void set_callback_queue (ros::CallbackQueueInterface *cbq) noexcept
 Process data with a ROS callback queue. More...
 
void set_policy (BufferPolicy policy, std::size_t max_queue_size=0)
 Modify the buffer policy. More...
 
void spin ()
 Process all data indefinitely. More...
 
void spin_once ()
 Process pending data. More...
 
bool wait () noexcept
 Wait for pending data. More...
 
template<class Rep , class Period >
bool wait_for (const std::chrono::duration< Rep, Period > &timeout) noexcept
 Wait for pending data until timeout expires. More...
 
virtual ~Buffer ()
 
- Public Member Functions inherited from fkie_message_filters::Sink< Inputs... >
Connection connect_to_source (Source< Inputs... > &src) noexcept
 Connect this sink to a source. More...
 
void disconnect_from_all_sources () noexcept
 Disconnect from all connected sources. More...
 
virtual ~Sink ()
 
- Public Member Functions inherited from fkie_message_filters::FilterBase
virtual ~FilterBase ()
 
- Public Member Functions inherited from fkie_message_filters::Source< IO< Inputs... > >
Connection connect_to_sink (Sink< Outputs... > &dst) noexcept
 Connect this source to a sink. More...
 
void disconnect_from_all_sinks () noexcept
 Disconnect from all connected sinks. More...
 
virtual ~Source ()
 

Protected Member Functions

virtual void receive (const Inputs &... in) override
 Process incoming data. More...
 
- Protected Member Functions inherited from fkie_message_filters::Source< IO< Inputs... > >
void send (const Outputs &... out)
 Pass data to all connected sinks. More...
 

Private Types

using QueueElement = std::tuple< Inputs... >
 

Private Member Functions

void process_some (std::unique_lock< std::mutex > &)
 
void send_queue_element (const QueueElement &e)
 

Private Attributes

ros::CallbackQueueInterfacecbq_
 
std::shared_ptr< Implimpl_
 

Additional Inherited Members

- Public Types inherited from fkie_message_filters::Sink< Inputs... >
using Input = IO< Inputs... >
 Grouped input types. More...
 
- Public Types inherited from fkie_message_filters::Source< IO< Inputs... > >
using Output = IO< Outputs... >
 Grouped output types. More...
 
- Static Public Attributes inherited from fkie_message_filters::Sink< Inputs... >
static constexpr std::size_t NUM_INPUTS
 Number of input arguments. More...
 
- Static Public Attributes inherited from fkie_message_filters::Source< IO< Inputs... > >
static constexpr std::size_t NUM_OUTPUTS
 Number of output arguments. More...
 

Detailed Description

template<class... Inputs>
class fkie_message_filters::Buffer< Inputs >

Store and forward data.

The buffer acts as a decoupler in the filter pipeline. Data can be stored and processed at a later time. The pipeline is effectively split into independent upstream and downstream parts, and it becomes possible to run the downstream data processing asynchronously. For instance, you can run computationally expensive algorithms on ROS messages in a different thread without blocking the ROS subscriber callback queue.

The buffer can be used for at least three different use cases:

  1. Use the buffer as a valve
    You can toggle between the BufferPolicy::Discard and BufferPolicy::Passthru modes to selectively disable or enable data processing at specific times. This is the simplest use case without any asynchronous processing.
  2. Run multiple ROS callback queues
    If your ROS node runs multiple callback queues, you can use the buffer to bind processing to a particular queue:
    // Version 1
    mf::Buffer<M> buf1(nh, 10); // will use the callback queue of nh
    // Version 2
    buf2.set_callback_queue(...); // will explicitly set the ROS callback queue
  3. Run your own thread(s) for data processing
    This is the most versatile option for advanced users. You can set up your worker threads as you desire and then call spin(), spin_once() or process_one() as you see fit.

Definition at line 121 of file buffer.h.

Member Typedef Documentation

◆ QueueElement

template<class... Inputs>
using fkie_message_filters::Buffer< Inputs >::QueueElement = std::tuple<Inputs...>
private

Definition at line 267 of file buffer.h.

Constructor & Destructor Documentation

◆ Buffer() [1/2]

template<class... Inputs>
fkie_message_filters::Buffer< Inputs >::Buffer ( const ros::NodeHandle nh,
std::size_t  max_queue_size 
)
noexcept

Constructor.

Constructs a buffer with BufferPolicy::Queue policy and data processing via ROS callbacks.

  • nh ROS node handle whose callback queue is used for data processing
  • max_queue_size the maximum number of queued data items

\nothrow

Definition at line 131 of file buffer_impl.h.

◆ Buffer() [2/2]

template<class... Inputs>
fkie_message_filters::Buffer< Inputs >::Buffer ( BufferPolicy  policy = BufferPolicy::Discard,
std::size_t  max_queue_size = 1,
ros::CallbackQueueInterface cbq = nullptr 
)
noexcept

Constructor.

  • policy the buffer policy
  • max_queue_size for the BufferPolicy::Queue policy, the maximum number of queued data items.
  • cbq ROS callback queue that is used to process queued data

\nothrow

See also
set_policy(), set_callback_queue()

Definition at line 124 of file buffer_impl.h.

◆ ~Buffer()

template<class... Inputs>
fkie_message_filters::Buffer< Inputs >::~Buffer
virtual

Definition at line 138 of file buffer_impl.h.

Member Function Documentation

◆ has_some()

template<class... Inputs>
bool fkie_message_filters::Buffer< Inputs >::has_some
noexcept

Check if the buffer has pending data.

Return values
trueif the current policy is BufferPolicy::Queue and a subsequent call to process_one() or spin_once() will process data.
falseotherwise

\nothrow

Definition at line 209 of file buffer_impl.h.

◆ process_one() [1/2]

template<class... Inputs>
bool fkie_message_filters::Buffer< Inputs >::process_one

Wait for and process one data item.

Return values
truedata has been processed successfully
falseeither the current buffer policy is not BufferPolicy::Queue, or the policy has been changed to something other than BufferPolicy::Queue while the function was waiting, or the ROS node has been shut down.

\filterthrow

Definition at line 231 of file buffer_impl.h.

◆ process_one() [2/2]

template<class... Inputs>
template<class Rep , class Period >
bool fkie_message_filters::Buffer< Inputs >::process_one ( const std::chrono::duration< Rep, Period > &  timeout)

Wait for and process one data item.

  • timeout maximum duration to wait for new data if none is available
Return values
truedata has been processed successfully
falseeither the current buffer policy is not BufferPolicy::Queue, or the policy has been changed to something other than BufferPolicy::Queue while the function was waiting, or time timeout expired, or the ROS node has been shut down.

\filterthrow

Definition at line 247 of file buffer_impl.h.

◆ process_some()

template<class... Inputs>
void fkie_message_filters::Buffer< Inputs >::process_some ( std::unique_lock< std::mutex > &  lock)
private

Definition at line 194 of file buffer_impl.h.

◆ receive()

template<class... Inputs>
void fkie_message_filters::Buffer< Inputs >::receive ( const Inputs &...  in)
overrideprotectedvirtual

Process incoming data.

Derived classes need to override this method to handle all data that is to be consumed by the sink.

\abstractthrow

Implements fkie_message_filters::Sink< Inputs... >.

Definition at line 262 of file buffer_impl.h.

◆ reset()

template<class... Inputs>
void fkie_message_filters::Buffer< Inputs >::reset
overridevirtualnoexcept

Reset filter.

If the buffer policy is BufferPolicy::Queue, this will clear the internal queue and discard all pending data. Otherwise, this function has no effect.

\nothrow

Reimplemented from fkie_message_filters::FilterBase.

Definition at line 185 of file buffer_impl.h.

◆ send_queue_element()

template<class... Inputs>
void fkie_message_filters::Buffer< Inputs >::send_queue_element ( const QueueElement e)
private

Definition at line 297 of file buffer_impl.h.

◆ set_callback_queue()

template<class... Inputs>
void fkie_message_filters::Buffer< Inputs >::set_callback_queue ( ros::CallbackQueueInterface cbq)
noexcept

Process data with a ROS callback queue.

Instead of running your own processing threads, you can use the ROS callback system to schedule data processing whenever new data arrives.

  • cbq the ROS callback queue or nullptr to disable ROS callbacks.

\nothrow

Definition at line 171 of file buffer_impl.h.

◆ set_policy()

template<class... Inputs>
void fkie_message_filters::Buffer< Inputs >::set_policy ( BufferPolicy  policy,
std::size_t  max_queue_size = 0 
)

Modify the buffer policy.

If the new buffer policy is not BufferPolicy::Queue, any pending call to wait(), process_one(), or spin() will return. If the buffer policy is changed to BufferPolicy::Passthru, all pending data is processed immediately before the function returns. If the buffer policy is changed to BufferPolicy::Discard, all pending data is discarded immediately.

  • policy the buffer policy
  • max_queue_size for the BufferPolicy::Queue policy, the maximum number of queued data items. If zero, the previously set queue size remains unchanged.
Warning
If you change the policy from BufferPolicy::Queue to BufferPolicy::Passthru and there is still a pending call to process_one(), spin_once(), or spin() in a different thread, some data might be processed in parallel or out of order when the queue is flushed.

\filterthrow

Definition at line 144 of file buffer_impl.h.

◆ spin()

template<class... Inputs>
void fkie_message_filters::Buffer< Inputs >::spin

Process all data indefinitely.

Blocks and processes all incoming data until the buffer policy is changed to something other than BufferPolicy::Queue or the ROS node is shut down.

You can call the function from multiple threads at once, and the workload will be shared among all participating threads.

\filterthrow

See also
set_policy()

Definition at line 283 of file buffer_impl.h.

◆ spin_once()

template<class... Inputs>
void fkie_message_filters::Buffer< Inputs >::spin_once

Process pending data.

Does nothing if the buffer policy is not BufferPolicy::Queue. The method is guaranteed to return as it will only process data which is pending at invocation time. This also means that there may be new data pending already when this method returns.

\filterthrow

Definition at line 178 of file buffer_impl.h.

◆ wait()

template<class... Inputs>
bool fkie_message_filters::Buffer< Inputs >::wait
noexcept

Wait for pending data.

Return values
truethere is data available for consumption by spin_once()
falseeither the current buffer policy is not BufferPolicy::Queue, or the policy has been changed to something other than BufferPolicy::Queue while the function was waiting, or the ROS node has been shut down.

\nothrow

Definition at line 216 of file buffer_impl.h.

◆ wait_for()

template<class... Inputs>
template<class Rep , class Period >
bool fkie_message_filters::Buffer< Inputs >::wait_for ( const std::chrono::duration< Rep, Period > &  timeout)
noexcept

Wait for pending data until timeout expires.

  • timeout maximum duration to wait for new data if none is available
Return values
truethere is data available for consumption by spin_once()
falseeither the current buffer policy is not BufferPolicy::Queue, or the policy has been changed to something other than BufferPolicy::Queue while the function was waiting, or the timeout expired, or the ROS node has been shut down.

\nothrow

Definition at line 224 of file buffer_impl.h.

Member Data Documentation

◆ cbq_

template<class... Inputs>
ros::CallbackQueueInterface* fkie_message_filters::Buffer< Inputs >::cbq_
private

Definition at line 271 of file buffer.h.

◆ impl_

template<class... Inputs>
std::shared_ptr<Impl> fkie_message_filters::Buffer< Inputs >::impl_
private

Definition at line 270 of file buffer.h.


The documentation for this class was generated from the following files:
fkie_message_filters
Definition: buffer.h:33
fkie_message_filters::chain
void chain(Filter1 &flt1, Filter2 &flt2, MoreFilters &... filters) noexcept
Convenience function to chain multiple filters.
Definition: filter_impl.h:71
fkie_message_filters::SimpleUserFilter::set_processing_function
void set_processing_function(const ProcessingFunction &f) noexcept
Set the user-defined processing function.
Definition: simple_user_filter_impl.h:47
fkie_message_filters::SimpleUserFilter
Simplified filter with user-defined callback function.
Definition: simple_user_filter.h:56
fkie_message_filters::Subscriber
Subscribe to a ROS topic as data provider.
Definition: subscriber.h:72
fkie_message_filters::Buffer
Store and forward data.
Definition: buffer.h:121
ros::NodeHandle::setCallbackQueue
void setCallbackQueue(CallbackQueueInterface *queue)
fkie_message_filters::Buffer::set_callback_queue
void set_callback_queue(ros::CallbackQueueInterface *cbq) noexcept
Process data with a ROS callback queue.
Definition: buffer_impl.h:171
fkie_message_filters::BufferPolicy::Queue
@ Queue
Queue for later use.
ros::spin
ROSCPP_DECL void spin()
ros::NodeHandle
ros::getGlobalCallbackQueue
ROSCPP_DECL CallbackQueue * getGlobalCallbackQueue()


fkie_message_filters
Author(s): Timo Röhling
autogenerated on Wed Mar 2 2022 00:18:57