Class MessageCache

Inheritance Relationships

Base Type

Class Documentation

class MessageCache : public rosbag2_cpp::cache::MessageCacheInterface

This class is responsible for implementing message cache, using two cache buffers and providing synchronization API for producer-consumer pattern.

Double buffering is a part of producer-consumer pattern and optimizes for the consumer performance (which can be a bottleneck, e.g. disk writes).

This is a “greedy consumer” implementation - every time the consumer asks for a buffer to consume, the buffers are swapped so that the latest data goes to the consumer right away.

Two instances of MessageCacheBuffer are used, one for producer and one for the consumer. Buffers are switched through swap_buffers function, which involves synchronization and a simple pointer switch.

The cache can enter a flushing state, intended as a finalization state, where all the remaining data is going to be processed: no new messages are accepted and buffer switching can be done unconditionally on consumer demand.

The cache holds infomation about dropped messages (per topic). These are messages that were pushed to the cache when it was full. Such situation signals performance issues, most likely with the CacheConsumer consumer callback.

Public Functions

explicit MessageCache(size_t max_buffer_size)
~MessageCache() override
virtual void push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg) override

Puts msg into primary buffer. With full cache, msg is ignored and counted as lost.

virtual std::shared_ptr< CacheBufferInterface > get_consumer_buffer () override RCPPUTILS_TSA_ACQUIRE(consumer_buffer_mutex_)

Gets a consumer buffer. In this greedy implementation, swap buffers before providing the buffer.

virtual void release_consumer_buffer () override RCPPUTILS_TSA_RELEASE(consumer_buffer_mutex_)

Signals that the consumer is done consuming, unlocking the buffer so it may be swapped.

virtual void wait_for_data() override

Blocks current thread and going to wait on condition variable until notify_data_ready will be called.

virtual void swap_buffers() override

Consumer API: wait until primary buffer is ready and swap it with consumer buffer. The caller thread (consumer thread) will sleep on a conditional variable until it can be awaken, which is to happen when: a) data was inserted into the producer buffer, consuming can continue after a swap b) we are flushing the data (in case we missed the last notification when consuming)

virtual void begin_flushing() override

Set the cache to consume-only mode for final buffer flush before closing.

virtual void done_flushing() override

Notify that flushing is complete.

virtual void log_dropped() override

Summarize dropped/remaining messages.

virtual void notify_data_ready() override

Producer API: notify consumer to wake-up (primary buffer has data)

Protected Attributes

std::unordered_map<std::string, uint32_t> messages_dropped_per_topic_

Dropped messages per topic. Used for printing in alphabetic order.