message.hh
Go to the documentation of this file.
1 
37 #pragma once
38 
39 #include <atomic>
40 #include <condition_variable>
41 #include <deque>
42 #include <functional>
43 #include <map>
44 #include <mutex>
45 #include <optional>
46 
47 #include <utility/BufferStream.hh>
48 #include <wire/Protocol.hh>
49 #include <wire/AckMessage.hh>
50 
52 
53 namespace multisense{
54 namespace legacy{
55 
60 template <typename T>
61 T deserialize(const std::vector<uint8_t>& data)
62 {
63  using namespace crl::multisense::details;
64 
65  utility::BufferStreamReader stream{data.data(), data.size()};
66 
67  wire::IdType id = 0;
68  wire::VersionType version = 0;
69 
70  stream & id;
71  stream & version;
72  T m(stream, version);
73 
74  return m;
75 }
76 
80 int64_t unwrap_sequence_id(uint16_t current_wire_id, int32_t previous_wire_id, int64_t current_sequence_id);
81 
85 bool header_valid(const std::vector<uint8_t> &raw_data);
86 
91 crl::multisense::details::wire::IdType get_message_type(const std::vector<uint8_t>& raw_buffer);
92 
96 std::optional<uint32_t> get_full_message_size(const std::vector<uint8_t> &raw_data);
97 
102 template <typename T>
103 std::vector<uint8_t> serialize(const T& message, uint16_t sequence_id, size_t mtu)
104 {
105  using namespace crl::multisense::details;
106 
107  const wire::IdType id = T::ID;
108  const wire::VersionType version = T::VERSION;
109 
110  std::vector<uint8_t> output_buffer(mtu - wire::COMBINED_HEADER_LENGTH, static_cast<uint8_t>(0));
111 
112  utility::BufferStreamWriter stream(output_buffer.data(), output_buffer.size());
113 
114  wire::Header& header = *(reinterpret_cast<wire::Header*>(stream.data()));
115  header.magic = wire::HEADER_MAGIC;
116  header.version = wire::HEADER_VERSION;
117  header.group = wire::HEADER_GROUP;
118  header.flags = 0;
119  header.sequenceIdentifier = sequence_id;
120 
121  stream.seek(sizeof(wire::Header));
122  stream & id;
123  stream & version;
124  const_cast<T*>(&message)->serialize(stream, version);
125 
126  header.messageLength = static_cast<uint32_t>(stream.tell() - sizeof(wire::Header));
127  header.byteOffset = 0;
128 
129  output_buffer.resize(stream.tell());
130 
131  return output_buffer;
132 }
133 
138 {
139 public:
140 
141  MessageCondition() = default;
142 
144  {
145  m_cv.notify_all();
146  }
147 
148  void unset()
149  {
150  std::lock_guard<std::mutex> lock(m_mutex);
151  m_notified = false;
152  }
153 
154  void set_and_notify(std::shared_ptr<std::vector<uint8_t>> data)
155  {
156  std::lock_guard<std::mutex> lock(m_mutex);
157  m_data = *data;
158  m_notified = true;
159  m_cv.notify_all();
160  }
161 
166  template <typename T, class Rep, class Period>
167  std::optional<T> wait(const std::optional<std::chrono::duration<Rep, Period>> &timeout)
168  {
169  std::unique_lock<std::mutex> lock(m_mutex);
170 
171  if (timeout)
172  {
173  if (m_cv.wait_for(lock, timeout.value(), [this]{return this->m_notified;}))
174  {
175  return std::make_optional(deserialize<T>(m_data));
176  }
177  }
178  else
179  {
180  m_cv.wait(lock, [this]{return this->m_notified;});
181  return std::make_optional(deserialize<T>(m_data));
182  }
183 
184  return std::nullopt;
185  }
186 
187  template <typename T>
188  std::optional<T> wait()
189  {
190  const std::optional<std::chrono::milliseconds> timeout = std::nullopt;
191  return wait<T>(timeout);
192  }
193 
194 private:
195 
196  std::mutex m_mutex;
197  std::condition_variable m_cv;
198  std::vector<uint8_t> m_data;
199  bool m_notified = false;
200 
201 };
202 
204 {
208  size_t received_messages = 0;
209 
213  size_t dropped_messages = 0;
214 
218  size_t invalid_packets = 0;
219 };
220 
225 {
226 public:
227  MessageAssembler(std::shared_ptr<BufferPool> buffer_pool);
228 
229  ~MessageAssembler() = default;
230 
231  bool process_packet(const std::vector<uint8_t> &raw_data);
232 
237  std::shared_ptr<MessageCondition> register_message(const crl::multisense::details::wire::IdType &message_id);
238 
243 
250  std::function<void(std::shared_ptr<const std::vector<uint8_t>>)> callback);
251 
256 
258  {
263  }
264 
265 private:
266 
271  {
273  size_t bytes_written = 0;
274  std::shared_ptr<std::vector<uint8_t>> data;
275  };
276 
282  std::tuple<std::shared_ptr<std::vector<uint8_t>>, std::deque<int64_t>>
283  get_buffer(uint32_t message_size, std::deque<int64_t> ordered_messages);
284 
288  bool write_data(InternalMessage &message, const std::vector<uint8_t> &raw_data);
289 
293  void dispatch(const crl::multisense::details::wire::IdType& message_id,
294  std::shared_ptr<std::vector<uint8_t>> data);
295 
299  std::mutex m_condition_mutex;
300 
304  std::mutex m_callback_mutex;
305 
309  std::shared_ptr<BufferPool> m_buffer_pool = nullptr;
310 
314  int32_t m_previous_wire_id = -1;
315 
320 
325  std::deque<int64_t> m_small_ordered_messages;
330  std::deque<int64_t> m_large_ordered_messages;
331 
335  std::map<int64_t, InternalMessage> m_active_messages;
336 
341  std::map<crl::multisense::details::wire::IdType, std::shared_ptr<MessageCondition>> m_conditions;
342 
348  std::function<void(std::shared_ptr<const std::vector<uint8_t>>)>> m_callbacks;
349 
353  std::atomic_size_t m_received_messages = 0;
354 
358  std::atomic_bool m_processing_messages = false;
359 
363  std::atomic_size_t m_dispatched_messages = 0;
364 
368  std::atomic_size_t m_invalid_packets = 0;
369 };
370 
371 }
372 }
multisense::legacy::MessageAssembler::write_data
bool write_data(InternalMessage &message, const std::vector< uint8_t > &raw_data)
Write raw data to an message.
Definition: message.cc:368
multisense::legacy::MessageAssembler::m_previous_wire_id
int32_t m_previous_wire_id
Internal id used to detect internal rollover of the 16 bit wire id.
Definition: message.hh:314
multisense::legacy::MessageAssembler::get_buffer
std::tuple< std::shared_ptr< std::vector< uint8_t > >, std::deque< int64_t > > get_buffer(uint32_t message_size, std::deque< int64_t > ordered_messages)
Get a new buffer that can hold a message of a given size. If no buffers are available try destroying ...
Definition: message.cc:339
crl::multisense::details
Definition: Legacy/details/channel.cc:63
crl::multisense::details::wire::COMBINED_HEADER_LENGTH
static CRL_CONSTEXPR uint8_t COMBINED_HEADER_LENGTH
Definition: Protocol.hh:71
multisense::legacy::MessageAssembler::m_dispatched_messages
std::atomic_size_t m_dispatched_messages
A counter for the number of messages we dispatched.
Definition: message.hh:363
multisense::legacy::MessageAssembler
Process incoming network data, and try the data into valid MultiSense Wire messages.
Definition: message.hh:224
crl::multisense::details::wire::HEADER_MAGIC
static CRL_CONSTEXPR uint16_t HEADER_MAGIC
Definition: Protocol.hh:76
multisense::legacy::MessageCondition::set_and_notify
void set_and_notify(std::shared_ptr< std::vector< uint8_t >> data)
Definition: message.hh:154
multisense::legacy::MessageAssembler::dispatch
void dispatch(const crl::multisense::details::wire::IdType &message_id, std::shared_ptr< std::vector< uint8_t >> data)
Dispatch to both our registrations and callbacks.
Definition: message.cc:412
multisense::legacy::MessageCondition::m_mutex
std::mutex m_mutex
Definition: message.hh:196
multisense::legacy::deserialize
T deserialize(const std::vector< uint8_t > &data)
deserialize the raw bytes of a message. Note this does not account for the wire::Header which is tran...
Definition: message.hh:61
BufferStream.hh
multisense::legacy::MessageAssembler::process_packet
bool process_packet(const std::vector< uint8_t > &raw_data)
Definition: message.cc:175
multisense::legacy::MessageAssembler::InternalMessage::type
crl::multisense::details::wire::IdType type
Definition: message.hh:272
multisense::legacy::MessageAssembler::m_callbacks
std::map< crl::multisense::details::wire::IdType, std::function< void(std::shared_ptr< const std::vector< uint8_t >>)> > m_callbacks
Callbacks which we are tracking. These are called when a message of a given type is fully received.
Definition: message.hh:348
multisense::legacy::MessageAssembler::InternalMessage::bytes_written
size_t bytes_written
Definition: message.hh:273
multisense::legacy::MessageCondition::MessageCondition
MessageCondition()=default
crl::multisense::details::wire::HEADER_VERSION
static CRL_CONSTEXPR uint16_t HEADER_VERSION
Definition: Protocol.hh:77
crl::multisense::details::utility::BufferStream::tell
std::size_t tell() const
Definition: BufferStream.hh:70
multisense::legacy::MessageAssembler::m_active_messages
std::map< int64_t, InternalMessage > m_active_messages
Active messages we are accumulating.
Definition: message.hh:335
AckMessage.hh
multisense::legacy::MessageCondition::wait
std::optional< T > wait()
Definition: message.hh:188
multisense::legacy::MessageCondition::m_notified
bool m_notified
Definition: message.hh:199
multisense::legacy::MessageAssembler::m_received_messages
std::atomic_size_t m_received_messages
A counter for the number of messages we have received.
Definition: message.hh:353
multisense::legacy::unwrap_sequence_id
int64_t unwrap_sequence_id(uint16_t current_wire_id, int32_t previous_wire_id, int64_t current_sequence_id)
Unwrap a 16-bit wire sequence ID into a unique 64-bit local ID.
Definition: message.cc:54
multisense::legacy::MessageStatistics::received_messages
size_t received_messages
The number of received messages.
Definition: message.hh:208
multisense::legacy::MessageAssembler::InternalMessage::data
std::shared_ptr< std::vector< uint8_t > > data
Definition: message.hh:274
multisense::legacy::MessageAssembler::remove_callback
void remove_callback(const crl::multisense::details::wire::IdType &message_id)
Remove a callback for a specific message type.
Definition: message.cc:329
multisense::legacy::MessageAssembler::m_small_ordered_messages
std::deque< int64_t > m_small_ordered_messages
Tracking for the ordering of the small buffers we have allocated. Used to determine which active mess...
Definition: message.hh:325
multisense::legacy::MessageStatistics::invalid_packets
size_t invalid_packets
The number of invalid packets we received.
Definition: message.hh:218
multisense::legacy::MessageAssembler::m_callback_mutex
std::mutex m_callback_mutex
Mutex to ensure callback calls into the MessageAssembler are thread safe.
Definition: message.hh:304
multisense::legacy::MessageCondition
A condition object which can be used to wait on messages from the stream.
Definition: message.hh:137
multisense::legacy::serialize
std::vector< uint8_t > serialize(const T &message, uint16_t sequence_id, size_t mtu)
Serialize a MultiSense Wire message for transmission. This adds the wire header to the message for tr...
Definition: message.hh:103
crl::multisense::details::utility::BufferStream::seek
void seek(std::size_t idx)
Definition: BufferStream.hh:93
multisense::legacy::MessageAssembler::register_callback
void register_callback(const crl::multisense::details::wire::IdType &message_id, std::function< void(std::shared_ptr< const std::vector< uint8_t >>)> callback)
Register a callback to receive valid messages of a given id. Note currently only one callback can be ...
Definition: message.cc:322
crl::multisense::details::utility::BufferStreamWriter
Definition: BufferStream.hh:259
multisense::legacy::MessageAssembler::m_current_sequence_id
int64_t m_current_sequence_id
Internal cache of our last sequence id.
Definition: message.hh:319
multisense::legacy::MessageAssembler::register_message
std::shared_ptr< MessageCondition > register_message(const crl::multisense::details::wire::IdType &message_id)
Register to be notified when a message of a given id arrives. Note this registration will only receiv...
Definition: message.cc:292
crl::multisense::details::wire::VersionType
uint16_t VersionType
Definition: Protocol.hh:137
multisense::legacy::MessageAssembler::m_conditions
std::map< crl::multisense::details::wire::IdType, std::shared_ptr< MessageCondition > > m_conditions
Conditions which we are tracking. These are notified when a message of a given type is fully received...
Definition: message.hh:341
multisense::legacy::MessageAssembler::remove_registration
void remove_registration(const crl::multisense::details::wire::IdType &message_id)
Remove a registration for a specific message type.
Definition: message.cc:313
multisense::legacy::MessageCondition::m_data
std::vector< uint8_t > m_data
Definition: message.hh:198
multisense::legacy::MessageAssembler::InternalMessage
An internal message object used to track how many bytes were written to a data buffer.
Definition: message.hh:270
multisense::legacy::get_full_message_size
std::optional< uint32_t > get_full_message_size(const std::vector< uint8_t > &raw_data)
Get the size of the full message in bytes from a raw data buffer.
Definition: message.cc:140
multisense::legacy::MessageAssembler::m_processing_messages
std::atomic_bool m_processing_messages
The a flag which indicates if a message is being processed.
Definition: message.hh:358
crl::multisense::details::wire::HEADER_GROUP
static CRL_CONSTEXPR uint16_t HEADER_GROUP
Definition: Protocol.hh:82
Protocol.hh
multisense::legacy::header_valid
bool header_valid(const std::vector< uint8_t > &raw_data)
Validate the Multisense header.
Definition: message.cc:96
multisense::legacy::MessageAssembler::m_invalid_packets
std::atomic_size_t m_invalid_packets
A counter for the number of invalid packets we received.
Definition: message.hh:368
storage.hh
multisense::legacy::MessageAssembler::~MessageAssembler
~MessageAssembler()=default
header
std_msgs::Header const * header(const M &m)
multisense::legacy::MessageCondition::unset
void unset()
Definition: message.hh:148
multisense
Definition: factory.cc:39
multisense::legacy::MessageCondition::wait
std::optional< T > wait(const std::optional< std::chrono::duration< Rep, Period >> &timeout)
convenience function equivalent to std::condition_variable::wait_for that performs a type conversion ...
Definition: message.hh:167
multisense::legacy::get_message_type
crl::multisense::details::wire::IdType get_message_type(const std::vector< uint8_t > &raw_buffer)
Get the message type of the message from the buffer over the wire. Note this does account for the wir...
Definition: message.cc:128
multisense::legacy::MessageCondition::~MessageCondition
~MessageCondition()
Definition: message.hh:143
multisense::legacy::MessageStatistics::dropped_messages
size_t dropped_messages
The number of dropped messages.
Definition: message.hh:213
crl::multisense::details::utility::BufferStream::data
void * data() const
Definition: BufferStream.hh:72
multisense::legacy::MessageAssembler::m_condition_mutex
std::mutex m_condition_mutex
Mutex to ensure registrations MessageAssembler are thread safe.
Definition: message.hh:299
crl::multisense::details::utility::BufferStreamReader
Definition: BufferStream.hh:192
multisense::legacy::MessageAssembler::MessageAssembler
MessageAssembler(std::shared_ptr< BufferPool > buffer_pool)
Definition: message.cc:170
multisense::legacy::MessageAssembler::m_large_ordered_messages
std::deque< int64_t > m_large_ordered_messages
Tracking for the ordering of the large buffers we have allocated. Used to determine which active mess...
Definition: message.hh:330
multisense::legacy::MessageStatistics
Definition: message.hh:203
crl::multisense::details::wire::IdType
uint16_t IdType
Definition: Protocol.hh:136
multisense::legacy::MessageCondition::m_cv
std::condition_variable m_cv
Definition: message.hh:197
multisense::legacy::MessageAssembler::get_message_statistics
MessageStatistics get_message_statistics() const
Definition: message.hh:257
multisense::legacy::MessageAssembler::m_buffer_pool
std::shared_ptr< BufferPool > m_buffer_pool
Buffer pool used to allocate messages into.
Definition: message.hh:309


multisense_lib
Author(s):
autogenerated on Thu Apr 17 2025 02:49:09