54 int64_t
unwrap_sequence_id(uint16_t current_wire_id, int32_t previous_wire_id, int64_t current_sequence_id)
56 auto unwrapped_id = current_sequence_id;
61 if (current_wire_id != previous_wire_id)
64 CRL_CONSTEXPR uint16_t ID_MAX = std::numeric_limits<uint16_t>::max();
69 if (-1 == previous_wire_id)
74 unwrapped_id = current_wire_id;
76 else if (((current_wire_id & ID_MASK) == ID_LOW) &&
77 ((previous_wire_id & ID_MASK) == ID_HIGH))
82 unwrapped_id += 1 + (
static_cast<int64_t
>(ID_MAX) - previous_wire_id) + current_wire_id;
89 unwrapped_id +=
static_cast<int64_t
>(current_wire_id) - previous_wire_id;
100 if (raw_data.size() <
static_cast<int>(
sizeof(wire::Header)))
102 CRL_DEBUG(
"undersized packet: %" PRIu64
"/ %" PRIu64
" bytes\n",
103 static_cast<uint64_t
>(raw_data.size()),
static_cast<uint64_t
>(
sizeof(wire::Header)));
107 const wire::Header&
header = *(
reinterpret_cast<const wire::Header*
>(raw_data.data()));
133 stream.
seek(
sizeof(wire::Header));
135 stream & message_type;
152 const wire::Header&
header = *(
reinterpret_cast<const wire::Header*
>(raw_data.data()));
160 const uint32_t final_message_size = (message_type ==
MSG_ID(wire::Disparity::ID)) ?
161 ((
header.messageLength - wire::Disparity::META_LENGTH) /
162 wire::Disparity::WIRE_BITS_PER_PIXEL *
163 wire::Disparity::API_BITS_PER_PIXEL +
164 wire::Disparity::META_LENGTH) :
167 return final_message_size;
171 m_buffer_pool(buffer_pool)
181 CRL_DEBUG(
"Buffer pool uninitialized. Cannot receive images\n");
194 const wire::Header&
header = *(
reinterpret_cast<const wire::Header*
>(raw_data.data()));
206 const bool is_large_buffer =
header.messageLength > buffer_config.small_buffer_size;
215 if (
header.byteOffset != 0)
228 std::shared_ptr<std::vector<uint8_t>> buffer =
nullptr;
229 std::tie(buffer, ordered_messages) =
get_buffer(message_size.value(), ordered_messages);
240 active_message = inserted_iterator;
241 ordered_messages.emplace_back(full_sequence_id);
249 auto& message = active_message->second;
257 if (message.bytes_written ==
header.messageLength)
264 if (message.type ==
MSG_ID(wire::Ack::ID))
266 const auto ack = deserialize<wire::Ack>(*message.data);
267 dispatch(ack.command, message.data);
271 dispatch(message.type, message.data);
278 if (
auto it = std::find(std::begin(ordered_messages), std::end(ordered_messages), full_sequence_id);
279 it != std::end(ordered_messages))
281 ordered_messages.erase(it);
302 auto [new_it, valid] =
m_conditions.emplace(message_id, std::make_shared<MessageCondition>());
306 return new_it->second;
323 std::function<
void(std::shared_ptr<
const std::vector<uint8_t>>)> callback)
338 std::tuple<std::shared_ptr<std::vector<uint8_t>>, std::deque<int64_t>>
343 if (message_size > buffer_config.large_buffer_size)
345 CRL_DEBUG(
"No buffers large enough to fit a message of %" PRIu32
" bytes\n", message_size);
346 return std::make_tuple(
nullptr, std::move(ordered_messages));
353 while(buffer ==
nullptr && !ordered_messages.empty())
355 const auto old_sequence = ordered_messages.front();
356 ordered_messages.pop_front();
359 if (buffer =
m_buffer_pool->get_buffer(message_size); buffer !=
nullptr)
365 return std::make_tuple(buffer, std::move(ordered_messages));
372 if (raw_data.size() <
sizeof(wire::Header))
377 const wire::Header&
header = *(
reinterpret_cast<const wire::Header*
>(raw_data.data()));
379 const size_t bytes_to_write = raw_data.size() -
sizeof(wire::Header);
383 CRL_DEBUG(
"Error. Buffer write will overrun internal buffer\n");
390 if(message.
type ==
MSG_ID(wire::Disparity::ID))
393 message.
data->size());
395 wire::Disparity::assembler(stream,
396 raw_data.
data() +
sizeof(wire::Header),
398 static_cast<uint32_t
>(bytes_to_write));
402 memcpy(message.
data->data() +
header.byteOffset,
403 raw_data.data() +
sizeof(wire::Header),
413 std::shared_ptr<std::vector<uint8_t>> data)
420 condition->second->set_and_notify(data);
432 callback->second(data);