message.cc
Go to the documentation of this file.
1 
37 #include <inttypes.h>
38 #include <limits>
39 
40 #include <utility/BufferStream.hh>
41 #include <wire/Protocol.hh>
42 
43 #include <wire/DisparityMessage.hh>
44 
46 
47 namespace multisense{
48 namespace legacy{
49 
54 int64_t unwrap_sequence_id(uint16_t current_wire_id, int32_t previous_wire_id, int64_t current_sequence_id)
55 {
56  auto unwrapped_id = current_sequence_id;
57 
58  //
59  // Look for a sequence change
60 
61  if (current_wire_id != previous_wire_id)
62  {
63 
64  CRL_CONSTEXPR uint16_t ID_MAX = std::numeric_limits<uint16_t>::max();
65  CRL_CONSTEXPR uint16_t ID_MASK = 0xF000;
66  CRL_CONSTEXPR uint16_t ID_HIGH = 0xF000;
67  CRL_CONSTEXPR uint16_t ID_LOW = 0x0000;
68 
69  if (-1 == previous_wire_id)
70  {
71  //
72  // Seed
73  //
74  unwrapped_id = current_wire_id;
75  }
76  else if (((current_wire_id & ID_MASK) == ID_LOW) &&
77  ((previous_wire_id & ID_MASK) == ID_HIGH))
78  {
79  //
80  // Detect forward 16-bit wrap
81  //
82  unwrapped_id += 1 + (static_cast<int64_t>(ID_MAX) - previous_wire_id) + current_wire_id;
83  }
84  else
85  {
86  //
87  // Normal case
88  //
89  unwrapped_id += static_cast<int64_t>(current_wire_id) - previous_wire_id;
90  }
91  }
92 
93  return unwrapped_id;
94 }
95 
96 bool header_valid(const std::vector<uint8_t> &raw_data)
97 {
98  using namespace crl::multisense::details;
99 
100  if (raw_data.size() < static_cast<int>(sizeof(wire::Header)))
101  {
102  CRL_DEBUG("undersized packet: %" PRIu64 "/ %" PRIu64 " bytes\n",
103  static_cast<uint64_t>(raw_data.size()), static_cast<uint64_t>(sizeof(wire::Header)));
104  return false;
105  }
106 
107  const wire::Header& header = *(reinterpret_cast<const wire::Header*>(raw_data.data()));
108 
109  if (wire::HEADER_MAGIC != header.magic)
110  {
111  CRL_DEBUG("bad protocol magic: 0x%x, expecting 0x%x\n", header.magic, wire::HEADER_MAGIC);
112  return false;
113  }
114  else if (wire::HEADER_VERSION != header.version)
115  {
116  CRL_DEBUG("bad protocol version: 0x%x, expecting 0x%x\n", header.version, wire::HEADER_VERSION);
117  return false;
118  }
119  else if (wire::HEADER_GROUP != header.group)
120  {
121  CRL_DEBUG("bad protocol group: 0x%x, expecting 0x%x\n", header.group, wire::HEADER_GROUP);
122  return false;
123  }
124 
125  return true;
126 }
127 
128 crl::multisense::details::wire::IdType get_message_type(const std::vector<uint8_t>& raw_buffer)
129 {
130  using namespace crl::multisense::details;
131 
132  utility::BufferStreamReader stream(raw_buffer.data(), raw_buffer.size());
133  stream.seek(sizeof(wire::Header));
134  wire::IdType message_type;
135  stream & message_type;
136 
137  return message_type;
138 }
139 
140 std::optional<uint32_t> get_full_message_size(const std::vector<uint8_t> &raw_data)
141 {
142  using namespace crl::multisense::details;
143 
144  if (!header_valid(raw_data))
145  {
146  CRL_DEBUG("Cannot get message size\n");
147  return std::nullopt;
148  }
149 
150  const auto message_type = get_message_type(raw_data);
151 
152  const wire::Header& header = *(reinterpret_cast<const wire::Header*>(raw_data.data()));
153 
154  //
155  // Handle the special case for disparity messages which are serialized on the wire using
156  // a number of bits per pixel which is less than how they are represented in the API. Computed
157  // the expanded size of the disparity message, and make sure our output buffer has enough space
158  // to store that data
159  //
160  const uint32_t final_message_size = (message_type == MSG_ID(wire::Disparity::ID)) ?
161  ((header.messageLength - wire::Disparity::META_LENGTH) /
162  wire::Disparity::WIRE_BITS_PER_PIXEL *
163  wire::Disparity::API_BITS_PER_PIXEL +
164  wire::Disparity::META_LENGTH) :
165  header.messageLength;
166 
167  return final_message_size;
168 }
169 
170 MessageAssembler::MessageAssembler(std::shared_ptr<BufferPool> buffer_pool):
171  m_buffer_pool(buffer_pool)
172 {
173 }
174 
175 bool MessageAssembler::process_packet(const std::vector<uint8_t> &raw_data)
176 {
177  using namespace crl::multisense::details;
178 
179  if (!m_buffer_pool)
180  {
181  CRL_DEBUG("Buffer pool uninitialized. Cannot receive images\n");
182  return false;
183  }
184 
185  if (!header_valid(raw_data))
186  {
188  return false;
189  }
190 
191  //
192  // TODO (malvarado): Header constructor with raw pointer
193  //
194  const wire::Header& header = *(reinterpret_cast<const wire::Header*>(raw_data.data()));
195 
196  //
197  // Create a unique sequence ID based on the wire ID
198  //
199  const int64_t full_sequence_id = unwrap_sequence_id(header.sequenceIdentifier, m_previous_wire_id, m_current_sequence_id);
200  m_current_sequence_id = full_sequence_id;
201  m_previous_wire_id = header.sequenceIdentifier;
202 
203  auto active_message = m_active_messages.find(full_sequence_id);
204 
205  const auto buffer_config = m_buffer_pool->get_config();
206  const bool is_large_buffer = header.messageLength > buffer_config.small_buffer_size;
207  auto& ordered_messages = (is_large_buffer ? m_large_ordered_messages : m_small_ordered_messages);
208 
209  //
210  // We are not currently tracking this message, attempt to create a new message for it. This
211  // will only happen once for each new message
212  //
213  if (active_message == std::end(m_active_messages))
214  {
215  if (header.byteOffset != 0)
216  {
218  return true;
219  }
220 
221  const auto message_size = get_full_message_size(raw_data);
222  if (!message_size)
223  {
225  return false;
226  }
227 
228  std::shared_ptr<std::vector<uint8_t>> buffer = nullptr;
229  std::tie(buffer, ordered_messages) = get_buffer(message_size.value(), ordered_messages);
230  if (!buffer)
231  {
232  return false;
233  }
234 
235  auto [inserted_iterator,_] = m_active_messages.emplace(full_sequence_id,
237  0,
238  std::move(buffer)});
239 
240  active_message = inserted_iterator;
241  ordered_messages.emplace_back(full_sequence_id);
242 
243  m_processing_messages = true;
245  }
246 
247  if (active_message != std::end(m_active_messages))
248  {
249  auto& message = active_message->second;
250 
251  if (!write_data(message, raw_data))
252  {
254  return false;
255  }
256 
257  if (message.bytes_written == header.messageLength)
258  {
259  //
260  // Handle the special case for Ack messages. To avoid collisions for all the Ack
261  // registrations, we extract the command associated with the Ack, and dispatch
262  // to all the listeners for that command
263  //
264  if (message.type == MSG_ID(wire::Ack::ID))
265  {
266  const auto ack = deserialize<wire::Ack>(*message.data);
267  dispatch(ack.command, message.data);
268  }
269  else
270  {
271  dispatch(message.type, message.data);
272  }
273 
274  //
275  // Remove processed messages
276  //
277  m_active_messages.erase(active_message);
278  if (auto it = std::find(std::begin(ordered_messages), std::end(ordered_messages), full_sequence_id);
279  it != std::end(ordered_messages))
280  {
281  ordered_messages.erase(it);
282  }
283 
284  m_processing_messages = false;
286  }
287  }
288 
289  return true;
290 }
291 
292 std::shared_ptr<MessageCondition> MessageAssembler::register_message(const crl::multisense::details::wire::IdType &message_id)
293 {
294  std::lock_guard<std::mutex> lock(m_condition_mutex);
295  if (auto it = m_conditions.find(message_id); it != std::end(m_conditions))
296  {
297  it->second->unset();
298  return it->second;
299  }
300  else
301  {
302  auto [new_it, valid] = m_conditions.emplace(message_id, std::make_shared<MessageCondition>());
303 
304  if (valid)
305  {
306  return new_it->second;
307  }
308  }
309 
310  return nullptr;
311 }
312 
314 {
315  std::lock_guard<std::mutex> lock(m_condition_mutex);
316  if (auto it = m_conditions.find(message_id); it != std::end(m_conditions))
317  {
318  m_conditions.erase(it);
319  }
320 }
321 
323  std::function<void(std::shared_ptr<const std::vector<uint8_t>>)> callback)
324 {
325  std::lock_guard<std::mutex> lock(m_callback_mutex);
326  m_callbacks.emplace(message_id, callback);
327 }
328 
330 {
331  std::lock_guard<std::mutex> lock(m_callback_mutex);
332  if (auto it = m_callbacks.find(message_id); it != std::end(m_callbacks))
333  {
334  m_callbacks.erase(it);
335  }
336 }
337 
338 std::tuple<std::shared_ptr<std::vector<uint8_t>>, std::deque<int64_t>>
339 MessageAssembler::get_buffer(uint32_t message_size, std::deque<int64_t> ordered_messages)
340 {
341  const auto buffer_config = m_buffer_pool->get_config();
342 
343  if (message_size > buffer_config.large_buffer_size)
344  {
345  CRL_DEBUG("No buffers large enough to fit a message of %" PRIu32 " bytes\n", message_size);
346  return std::make_tuple(nullptr, std::move(ordered_messages));
347  }
348 
349  //
350  // Remove old messages until we can get a free buffer
351  //
352  auto buffer = m_buffer_pool->get_buffer(message_size);
353  while(buffer == nullptr && !ordered_messages.empty())
354  {
355  const auto old_sequence = ordered_messages.front();
356  ordered_messages.pop_front();
357  m_active_messages.erase(old_sequence);
358 
359  if (buffer = m_buffer_pool->get_buffer(message_size); buffer != nullptr)
360  {
361  break;
362  }
363  }
364 
365  return std::make_tuple(buffer, std::move(ordered_messages));
366 }
367 
368 bool MessageAssembler::write_data(InternalMessage &message, const std::vector<uint8_t> &raw_data)
369 {
370  using namespace crl::multisense::details;
371 
372  if (raw_data.size() < sizeof(wire::Header))
373  {
374  return false;
375  }
376 
377  const wire::Header& header = *(reinterpret_cast<const wire::Header*>(raw_data.data()));
378 
379  const size_t bytes_to_write = raw_data.size() - sizeof(wire::Header);
380 
381  if (bytes_to_write + message.bytes_written > message.data->size())
382  {
383  CRL_DEBUG("Error. Buffer write will overrun internal buffer\n");
384  return false;
385  }
386 
387  //
388  // Handle the special case unpacking of disparity data from 12bit to 16bit images
389  //
390  if(message.type == MSG_ID(wire::Disparity::ID))
391  {
392  utility::BufferStreamWriter stream(message.data->data(),
393  message.data->size());
394 
395  wire::Disparity::assembler(stream,
396  raw_data.data() + sizeof(wire::Header),
397  header.byteOffset,
398  static_cast<uint32_t>(bytes_to_write));
399  }
400  else
401  {
402  memcpy(message.data->data() + header.byteOffset,
403  raw_data.data() + sizeof(wire::Header),
404  bytes_to_write);
405  }
406 
407  message.bytes_written += bytes_to_write;
408 
409  return true;
410 }
411 
413  std::shared_ptr<std::vector<uint8_t>> data)
414 {
415  {
416  std::lock_guard<std::mutex> lock(m_condition_mutex);
417 
418  if (auto condition = m_conditions.find(message_id); condition != std::end(m_conditions))
419  {
420  condition->second->set_and_notify(data);
421 
422  //
423  // Remove our registration after we dispatch the message
424  //
425  m_conditions.erase(condition);
426  }
427  }
428 
429  std::lock_guard<std::mutex> lock(m_callback_mutex);
430  if (auto callback = m_callbacks.find(message_id); callback != std::end(m_callbacks))
431  {
432  callback->second(data);
433  }
434 }
435 
436 }
437 }
multisense::legacy::MessageAssembler::write_data
bool write_data(InternalMessage &message, const std::vector< uint8_t > &raw_data)
Write raw data to an message.
Definition: message.cc:368
multisense::legacy::MessageAssembler::m_previous_wire_id
int32_t m_previous_wire_id
Internal id used to detect internal rollover of the 16 bit wire id.
Definition: message.hh:314
multisense::legacy::MessageAssembler::get_buffer
std::tuple< std::shared_ptr< std::vector< uint8_t > >, std::deque< int64_t > > get_buffer(uint32_t message_size, std::deque< int64_t > ordered_messages)
Get a new buffer that can hold a message of a given size. If no buffers are available try destroying ...
Definition: message.cc:339
crl::multisense::details
Definition: Legacy/details/channel.cc:63
multisense::legacy::MessageAssembler::m_dispatched_messages
std::atomic_size_t m_dispatched_messages
A counter for the number of messages we dispatched.
Definition: message.hh:363
CRL_DEBUG
#define CRL_DEBUG(fmt,...)
Definition: Exception.hh:71
crl::multisense::details::wire::HEADER_MAGIC
static CRL_CONSTEXPR uint16_t HEADER_MAGIC
Definition: Protocol.hh:76
multisense::legacy::MessageAssembler::dispatch
void dispatch(const crl::multisense::details::wire::IdType &message_id, std::shared_ptr< std::vector< uint8_t >> data)
Dispatch to both our registrations and callbacks.
Definition: message.cc:412
BufferStream.hh
multisense::legacy::MessageAssembler::process_packet
bool process_packet(const std::vector< uint8_t > &raw_data)
Definition: message.cc:175
message.hh
multisense::legacy::MessageAssembler::InternalMessage::type
crl::multisense::details::wire::IdType type
Definition: message.hh:272
DisparityMessage.hh
multisense::legacy::MessageAssembler::m_callbacks
std::map< crl::multisense::details::wire::IdType, std::function< void(std::shared_ptr< const std::vector< uint8_t >>)> > m_callbacks
Callbacks which we are tracking. These are called when a message of a given type is fully received.
Definition: message.hh:348
multisense::legacy::MessageAssembler::InternalMessage::bytes_written
size_t bytes_written
Definition: message.hh:273
crl::multisense::details::wire::HEADER_VERSION
static CRL_CONSTEXPR uint16_t HEADER_VERSION
Definition: Protocol.hh:77
CRL_CONSTEXPR
#define CRL_CONSTEXPR
Definition: Legacy/include/MultiSense/details/utility/Portability.hh:49
multisense::legacy::MessageAssembler::m_active_messages
std::map< int64_t, InternalMessage > m_active_messages
Active messages we are accumulating.
Definition: message.hh:335
multisense::legacy::MessageAssembler::m_received_messages
std::atomic_size_t m_received_messages
A counter for the number of messages we have received.
Definition: message.hh:353
multisense::legacy::unwrap_sequence_id
int64_t unwrap_sequence_id(uint16_t current_wire_id, int32_t previous_wire_id, int64_t current_sequence_id)
Unwrap a 16-bit wire sequence ID into a unique 64-bit local ID.
Definition: message.cc:54
multisense::legacy::MessageAssembler::InternalMessage::data
std::shared_ptr< std::vector< uint8_t > > data
Definition: message.hh:274
multisense::legacy::MessageAssembler::remove_callback
void remove_callback(const crl::multisense::details::wire::IdType &message_id)
Remove a callback for a specific message type.
Definition: message.cc:329
multisense::legacy::MessageAssembler::m_small_ordered_messages
std::deque< int64_t > m_small_ordered_messages
Tracking for the ordering of the small buffers we have allocated. Used to determine which active mess...
Definition: message.hh:325
multisense::legacy::MessageAssembler::m_callback_mutex
std::mutex m_callback_mutex
Mutex to ensure callback calls into the MessageAssembler are thread safe.
Definition: message.hh:304
crl::multisense::details::utility::BufferStream::seek
void seek(std::size_t idx)
Definition: BufferStream.hh:93
multisense::legacy::MessageAssembler::register_callback
void register_callback(const crl::multisense::details::wire::IdType &message_id, std::function< void(std::shared_ptr< const std::vector< uint8_t >>)> callback)
Register a callback to receive valid messages of a given id. Note currently only one callback can be ...
Definition: message.cc:322
crl::multisense::details::utility::BufferStreamWriter
Definition: BufferStream.hh:259
multisense::legacy::MessageAssembler::m_current_sequence_id
int64_t m_current_sequence_id
Internal cache of our last sequence id.
Definition: message.hh:319
multisense::legacy::MessageAssembler::register_message
std::shared_ptr< MessageCondition > register_message(const crl::multisense::details::wire::IdType &message_id)
Register to be notified when a message of a given id arrives. Note this registration will only receiv...
Definition: message.cc:292
multisense::legacy::MessageAssembler::m_conditions
std::map< crl::multisense::details::wire::IdType, std::shared_ptr< MessageCondition > > m_conditions
Conditions which we are tracking. These are notified when a message of a given type is fully received...
Definition: message.hh:341
multisense::legacy::MessageAssembler::remove_registration
void remove_registration(const crl::multisense::details::wire::IdType &message_id)
Remove a registration for a specific message type.
Definition: message.cc:313
MSG_ID
#define MSG_ID(x)
Definition: Protocol.hh:344
multisense::legacy::MessageAssembler::InternalMessage
An internal message object used to track how many bytes were written to a data buffer.
Definition: message.hh:270
multisense::legacy::get_full_message_size
std::optional< uint32_t > get_full_message_size(const std::vector< uint8_t > &raw_data)
Get the size of the full message in bytes from a raw data buffer.
Definition: message.cc:140
multisense::legacy::MessageAssembler::m_processing_messages
std::atomic_bool m_processing_messages
The a flag which indicates if a message is being processed.
Definition: message.hh:358
crl::multisense::details::wire::HEADER_GROUP
static CRL_CONSTEXPR uint16_t HEADER_GROUP
Definition: Protocol.hh:82
Protocol.hh
multisense::legacy::header_valid
bool header_valid(const std::vector< uint8_t > &raw_data)
Validate the Multisense header.
Definition: message.cc:96
multisense::legacy::MessageAssembler::m_invalid_packets
std::atomic_size_t m_invalid_packets
A counter for the number of invalid packets we received.
Definition: message.hh:368
header
std_msgs::Header const * header(const M &m)
multisense
Definition: factory.cc:39
multisense::legacy::get_message_type
crl::multisense::details::wire::IdType get_message_type(const std::vector< uint8_t > &raw_buffer)
Get the message type of the message from the buffer over the wire. Note this does account for the wir...
Definition: message.cc:128
crl::multisense::details::utility::BufferStream::data
void * data() const
Definition: BufferStream.hh:72
multisense::legacy::MessageAssembler::m_condition_mutex
std::mutex m_condition_mutex
Mutex to ensure registrations MessageAssembler are thread safe.
Definition: message.hh:299
crl::multisense::details::utility::BufferStreamReader
Definition: BufferStream.hh:192
multisense::legacy::MessageAssembler::MessageAssembler
MessageAssembler(std::shared_ptr< BufferPool > buffer_pool)
Definition: message.cc:170
multisense::legacy::MessageAssembler::m_large_ordered_messages
std::deque< int64_t > m_large_ordered_messages
Tracking for the ordering of the large buffers we have allocated. Used to determine which active mess...
Definition: message.hh:330
crl::multisense::details::wire::IdType
uint16_t IdType
Definition: Protocol.hh:136
multisense::legacy::MessageAssembler::m_buffer_pool
std::shared_ptr< BufferPool > m_buffer_pool
Buffer pool used to allocate messages into.
Definition: message.hh:309


multisense_lib
Author(s):
autogenerated on Thu Apr 17 2025 02:49:09