Process incoming network data, and try the data into valid MultiSense Wire messages. More...
#include <message.hh>
Classes | |
struct | InternalMessage |
An internal message object used to track how many bytes were written to a data buffer. More... | |
Public Member Functions | |
MessageStatistics | get_message_statistics () const |
MessageAssembler (std::shared_ptr< BufferPool > buffer_pool) | |
bool | process_packet (const std::vector< uint8_t > &raw_data) |
void | register_callback (const crl::multisense::details::wire::IdType &message_id, std::function< void(std::shared_ptr< const std::vector< uint8_t >>)> callback) |
Register a callback to receive valid messages of a given id. Note currently only one callback can be registered for a message_id. Note this will pass the raw image buffer to the callback. It's up to the user to deserialize the message into it's appropriate type. More... | |
std::shared_ptr< MessageCondition > | register_message (const crl::multisense::details::wire::IdType &message_id) |
Register to be notified when a message of a given id arrives. Note this registration will only receive a single message. To receive multiple messages please use the callback interface. More... | |
void | remove_callback (const crl::multisense::details::wire::IdType &message_id) |
Remove a callback for a specific message type. More... | |
void | remove_registration (const crl::multisense::details::wire::IdType &message_id) |
Remove a registration for a specific message type. More... | |
~MessageAssembler ()=default | |
Private Member Functions | |
void | dispatch (const crl::multisense::details::wire::IdType &message_id, std::shared_ptr< std::vector< uint8_t >> data) |
Dispatch to both our registrations and callbacks. More... | |
std::tuple< std::shared_ptr< std::vector< uint8_t > >, std::deque< int64_t > > | get_buffer (uint32_t message_size, std::deque< int64_t > ordered_messages) |
Get a new buffer that can hold a message of a given size. If no buffers are available try destroying older buffers we may no longer be using. The id's of these buffers are stored in the input ordered_messages. More... | |
bool | write_data (InternalMessage &message, const std::vector< uint8_t > &raw_data) |
Write raw data to an message. More... | |
Private Attributes | |
std::map< int64_t, InternalMessage > | m_active_messages |
Active messages we are accumulating. More... | |
std::shared_ptr< BufferPool > | m_buffer_pool = nullptr |
Buffer pool used to allocate messages into. More... | |
std::mutex | m_callback_mutex |
Mutex to ensure callback calls into the MessageAssembler are thread safe. More... | |
std::map< crl::multisense::details::wire::IdType, std::function< void(std::shared_ptr< const std::vector< uint8_t >>)> > | m_callbacks |
Callbacks which we are tracking. These are called when a message of a given type is fully received. More... | |
std::mutex | m_condition_mutex |
Mutex to ensure registrations MessageAssembler are thread safe. More... | |
std::map< crl::multisense::details::wire::IdType, std::shared_ptr< MessageCondition > > | m_conditions |
Conditions which we are tracking. These are notified when a message of a given type is fully received. More... | |
int64_t | m_current_sequence_id = 0 |
Internal cache of our last sequence id. More... | |
std::atomic_size_t | m_dispatched_messages = 0 |
A counter for the number of messages we dispatched. More... | |
std::atomic_size_t | m_invalid_packets = 0 |
A counter for the number of invalid packets we received. More... | |
std::deque< int64_t > | m_large_ordered_messages |
Tracking for the ordering of the large buffers we have allocated. Used to determine which active message to potentially remove. More... | |
int32_t | m_previous_wire_id = -1 |
Internal id used to detect internal rollover of the 16 bit wire id. More... | |
std::atomic_bool | m_processing_messages = false |
The a flag which indicates if a message is being processed. More... | |
std::atomic_size_t | m_received_messages = 0 |
A counter for the number of messages we have received. More... | |
std::deque< int64_t > | m_small_ordered_messages |
Tracking for the ordering of the small buffers we have allocated. Used to determine which active message to potentially remove. More... | |
Process incoming network data, and try the data into valid MultiSense Wire messages.
Definition at line 224 of file message.hh.
multisense::legacy::MessageAssembler::MessageAssembler | ( | std::shared_ptr< BufferPool > | buffer_pool | ) |
Definition at line 170 of file message.cc.
|
default |
|
private |
Dispatch to both our registrations and callbacks.
Definition at line 412 of file message.cc.
|
private |
Get a new buffer that can hold a message of a given size. If no buffers are available try destroying older buffers we may no longer be using. The id's of these buffers are stored in the input ordered_messages.
Definition at line 339 of file message.cc.
|
inline |
Definition at line 257 of file message.hh.
bool multisense::legacy::MessageAssembler::process_packet | ( | const std::vector< uint8_t > & | raw_data | ) |
Definition at line 175 of file message.cc.
void multisense::legacy::MessageAssembler::register_callback | ( | const crl::multisense::details::wire::IdType & | message_id, |
std::function< void(std::shared_ptr< const std::vector< uint8_t >>)> | callback | ||
) |
Register a callback to receive valid messages of a given id. Note currently only one callback can be registered for a message_id. Note this will pass the raw image buffer to the callback. It's up to the user to deserialize the message into it's appropriate type.
Definition at line 322 of file message.cc.
std::shared_ptr< MessageCondition > multisense::legacy::MessageAssembler::register_message | ( | const crl::multisense::details::wire::IdType & | message_id | ) |
Register to be notified when a message of a given id arrives. Note this registration will only receive a single message. To receive multiple messages please use the callback interface.
Definition at line 292 of file message.cc.
void multisense::legacy::MessageAssembler::remove_callback | ( | const crl::multisense::details::wire::IdType & | message_id | ) |
Remove a callback for a specific message type.
Definition at line 329 of file message.cc.
void multisense::legacy::MessageAssembler::remove_registration | ( | const crl::multisense::details::wire::IdType & | message_id | ) |
Remove a registration for a specific message type.
Definition at line 313 of file message.cc.
|
private |
Write raw data to an message.
Definition at line 368 of file message.cc.
|
private |
Active messages we are accumulating.
Definition at line 335 of file message.hh.
|
private |
Buffer pool used to allocate messages into.
Definition at line 309 of file message.hh.
|
private |
Mutex to ensure callback calls into the MessageAssembler are thread safe.
Definition at line 304 of file message.hh.
|
private |
Callbacks which we are tracking. These are called when a message of a given type is fully received.
Definition at line 348 of file message.hh.
|
private |
Mutex to ensure registrations MessageAssembler are thread safe.
Definition at line 299 of file message.hh.
|
private |
Conditions which we are tracking. These are notified when a message of a given type is fully received.
Definition at line 341 of file message.hh.
|
private |
Internal cache of our last sequence id.
Definition at line 319 of file message.hh.
|
private |
A counter for the number of messages we dispatched.
Definition at line 363 of file message.hh.
|
private |
A counter for the number of invalid packets we received.
Definition at line 368 of file message.hh.
|
private |
Tracking for the ordering of the large buffers we have allocated. Used to determine which active message to potentially remove.
Definition at line 330 of file message.hh.
|
private |
Internal id used to detect internal rollover of the 16 bit wire id.
Definition at line 314 of file message.hh.
|
private |
The a flag which indicates if a message is being processed.
Definition at line 358 of file message.hh.
|
private |
A counter for the number of messages we have received.
Definition at line 353 of file message.hh.
|
private |
Tracking for the ordering of the small buffers we have allocated. Used to determine which active message to potentially remove.
Definition at line 325 of file message.hh.