Go to the documentation of this file.
40 #include <condition_variable>
80 int64_t
unwrap_sequence_id(uint16_t current_wire_id, int32_t previous_wire_id, int64_t current_sequence_id);
102 template <
typename T>
103 std::vector<uint8_t>
serialize(
const T& message, uint16_t sequence_id,
size_t mtu)
114 wire::Header&
header = *(
reinterpret_cast<wire::Header*
>(stream.
data()));
119 header.sequenceIdentifier = sequence_id;
121 stream.
seek(
sizeof(wire::Header));
124 const_cast<T*
>(&message)->
serialize(stream, version);
126 header.messageLength =
static_cast<uint32_t
>(stream.
tell() -
sizeof(wire::Header));
129 output_buffer.resize(stream.
tell());
131 return output_buffer;
150 std::lock_guard<std::mutex> lock(
m_mutex);
156 std::lock_guard<std::mutex> lock(
m_mutex);
166 template <
typename T,
class Rep,
class Period>
167 std::optional<T>
wait(
const std::optional<std::chrono::duration<Rep, Period>> &timeout)
169 std::unique_lock<std::mutex> lock(
m_mutex);
173 if (
m_cv.wait_for(lock, timeout.value(), [
this]{return this->m_notified;}))
175 return std::make_optional(deserialize<T>(
m_data));
181 return std::make_optional(deserialize<T>(
m_data));
187 template <
typename T>
190 const std::optional<std::chrono::milliseconds> timeout = std::nullopt;
191 return wait<T>(timeout);
250 std::function<
void(std::shared_ptr<
const std::vector<uint8_t>>)> callback);
274 std::shared_ptr<std::vector<uint8_t>>
data;
282 std::tuple<std::shared_ptr<std::vector<uint8_t>>, std::deque<int64_t>>
283 get_buffer(uint32_t message_size, std::deque<int64_t> ordered_messages);
294 std::shared_ptr<std::vector<uint8_t>> data);
341 std::map<crl::multisense::details::wire::IdType, std::shared_ptr<MessageCondition>>
m_conditions;
348 std::function<void(std::shared_ptr<
const std::vector<uint8_t>>)>>
m_callbacks;
bool write_data(InternalMessage &message, const std::vector< uint8_t > &raw_data)
Write raw data to an message.
int32_t m_previous_wire_id
Internal id used to detect internal rollover of the 16 bit wire id.
std::tuple< std::shared_ptr< std::vector< uint8_t > >, std::deque< int64_t > > get_buffer(uint32_t message_size, std::deque< int64_t > ordered_messages)
Get a new buffer that can hold a message of a given size. If no buffers are available try destroying ...
static CRL_CONSTEXPR uint8_t COMBINED_HEADER_LENGTH
std::atomic_size_t m_dispatched_messages
A counter for the number of messages we dispatched.
Process incoming network data, and try the data into valid MultiSense Wire messages.
static CRL_CONSTEXPR uint16_t HEADER_MAGIC
void set_and_notify(std::shared_ptr< std::vector< uint8_t >> data)
void dispatch(const crl::multisense::details::wire::IdType &message_id, std::shared_ptr< std::vector< uint8_t >> data)
Dispatch to both our registrations and callbacks.
T deserialize(const std::vector< uint8_t > &data)
deserialize the raw bytes of a message. Note this does not account for the wire::Header which is tran...
bool process_packet(const std::vector< uint8_t > &raw_data)
crl::multisense::details::wire::IdType type
std::map< crl::multisense::details::wire::IdType, std::function< void(std::shared_ptr< const std::vector< uint8_t >>)> > m_callbacks
Callbacks which we are tracking. These are called when a message of a given type is fully received.
MessageCondition()=default
static CRL_CONSTEXPR uint16_t HEADER_VERSION
std::map< int64_t, InternalMessage > m_active_messages
Active messages we are accumulating.
std::optional< T > wait()
std::atomic_size_t m_received_messages
A counter for the number of messages we have received.
int64_t unwrap_sequence_id(uint16_t current_wire_id, int32_t previous_wire_id, int64_t current_sequence_id)
Unwrap a 16-bit wire sequence ID into a unique 64-bit local ID.
size_t received_messages
The number of received messages.
std::shared_ptr< std::vector< uint8_t > > data
void remove_callback(const crl::multisense::details::wire::IdType &message_id)
Remove a callback for a specific message type.
std::deque< int64_t > m_small_ordered_messages
Tracking for the ordering of the small buffers we have allocated. Used to determine which active mess...
size_t invalid_packets
The number of invalid packets we received.
std::mutex m_callback_mutex
Mutex to ensure callback calls into the MessageAssembler are thread safe.
A condition object which can be used to wait on messages from the stream.
std::vector< uint8_t > serialize(const T &message, uint16_t sequence_id, size_t mtu)
Serialize a MultiSense Wire message for transmission. This adds the wire header to the message for tr...
void seek(std::size_t idx)
void register_callback(const crl::multisense::details::wire::IdType &message_id, std::function< void(std::shared_ptr< const std::vector< uint8_t >>)> callback)
Register a callback to receive valid messages of a given id. Note currently only one callback can be ...
int64_t m_current_sequence_id
Internal cache of our last sequence id.
std::shared_ptr< MessageCondition > register_message(const crl::multisense::details::wire::IdType &message_id)
Register to be notified when a message of a given id arrives. Note this registration will only receiv...
std::map< crl::multisense::details::wire::IdType, std::shared_ptr< MessageCondition > > m_conditions
Conditions which we are tracking. These are notified when a message of a given type is fully received...
void remove_registration(const crl::multisense::details::wire::IdType &message_id)
Remove a registration for a specific message type.
std::vector< uint8_t > m_data
An internal message object used to track how many bytes were written to a data buffer.
std::optional< uint32_t > get_full_message_size(const std::vector< uint8_t > &raw_data)
Get the size of the full message in bytes from a raw data buffer.
std::atomic_bool m_processing_messages
The a flag which indicates if a message is being processed.
static CRL_CONSTEXPR uint16_t HEADER_GROUP
bool header_valid(const std::vector< uint8_t > &raw_data)
Validate the Multisense header.
std::atomic_size_t m_invalid_packets
A counter for the number of invalid packets we received.
~MessageAssembler()=default
std_msgs::Header const * header(const M &m)
std::optional< T > wait(const std::optional< std::chrono::duration< Rep, Period >> &timeout)
convenience function equivalent to std::condition_variable::wait_for that performs a type conversion ...
crl::multisense::details::wire::IdType get_message_type(const std::vector< uint8_t > &raw_buffer)
Get the message type of the message from the buffer over the wire. Note this does account for the wir...
size_t dropped_messages
The number of dropped messages.
std::mutex m_condition_mutex
Mutex to ensure registrations MessageAssembler are thread safe.
MessageAssembler(std::shared_ptr< BufferPool > buffer_pool)
std::deque< int64_t > m_large_ordered_messages
Tracking for the ordering of the large buffers we have allocated. Used to determine which active mess...
std::condition_variable m_cv
MessageStatistics get_message_statistics() const
std::shared_ptr< BufferPool > m_buffer_pool
Buffer pool used to allocate messages into.