24 #include "spdlog/fmt/bin_to_hex.h"
25 #include "spdlog/fmt/chrono.h"
31 : queue(maxSize, blocking), name(streamName) {
37 readingThread = std::thread([
this, stream = std::move(stream)]()
mutable {
38 std::uint64_t numPacketsRead = 0;
44 const auto t1Parse = std::chrono::steady_clock::now();
47 auto msgGrp = std::static_pointer_cast<MessageGroup>(
data);
48 unsigned int size = msgGrp->getNumMessages();
49 std::vector<std::shared_ptr<ADatatype>> packets;
50 packets.reserve(
size);
51 for(
unsigned int i = 0; i <
size; ++i) {
55 auto rawMsgGrp = std::static_pointer_cast<RawMessageGroup>(
data->getRaw());
56 for(
auto& msg : rawMsgGrp->group) {
57 msgGrp->add(msg.first, packets[msg.second.index]);
60 const auto t2Parse = std::chrono::steady_clock::now();
64 std::vector<std::uint8_t> metadata;
66 data->getRaw()->serialize(metadata,
type);
67 logger::trace(
"Received message from device ({}) - parsing time: {}, data size: {}, object type: {} object data: {}",
69 std::chrono::duration_cast<std::chrono::microseconds>(t2Parse - t1Parse),
70 data->getRaw()->data.size(),
71 static_cast<std::int32_t
>(
type),
72 spdlog::to_hex(metadata));
77 throw std::runtime_error(fmt::format(
"Underlying queue destructed"));
87 const auto& callback = kv.second;
90 }
catch(
const std::exception& ex) {
91 logger::error(
"Callback with id: {} throwed an exception: {}", kv.first, ex.what());
97 }
catch(
const std::exception& ex) {
98 exceptionMessage = fmt::format(
"Communication exception - possible device error/misconfiguration. Original message '{}'", ex.what());
116 if(!
running.exchange(
false))
return;
138 queue.setBlocking(blocking);
143 return queue.getBlocking();
148 queue.setMaxSize(maxSize);
153 return queue.getMaxSize();
176 return addCallback([callback = std::move(callback)](std::string, std::shared_ptr<ADatatype> message) { callback(std::move(message)); });
181 return addCallback([callback = std::move(callback)](std::string, std::shared_ptr<ADatatype>) { callback(); });
189 if(
callbacks.count(callbackId) == 0)
return false;
198 const std::shared_ptr<XLinkConnection> conn,
const std::string& streamName,
unsigned int maxSize,
bool blocking, std::size_t maxDataSize)
199 : queue(maxSize, blocking), name(streamName), maxDataSize(maxDataSize) {
203 writingThread = std::thread([
this, stream = std::move(stream)]()
mutable {
204 std::uint64_t numPacketsSent = 0;
208 std::shared_ptr<RawBuffer>
data;
214 auto t1Parse = std::chrono::steady_clock::now();
215 std::vector<std::vector<uint8_t>> serializedAux;
217 auto rawMsgGrp = std::dynamic_pointer_cast<RawMessageGroup>(data);
218 serializedAux.reserve(rawMsgGrp->group.size());
219 unsigned int index = 0;
220 for(auto& msg : rawMsgGrp->group) {
221 msg.second.index = index++;
222 serializedAux.push_back(StreamMessageParser::serializeMessage(msg.second.buffer));
226 auto t2Parse = std::chrono::steady_clock::now();
230 std::vector<std::uint8_t> metadata;
233 logger::trace(
"Sending message to device ({}) - serialize time: {}, data size: {}, object type: {} object data: {}",
235 std::chrono::duration_cast<std::chrono::microseconds>(t2Parse - t1Parse),
238 spdlog::to_hex(metadata));
242 stream.
write(serialized);
243 for(
auto& msg : serializedAux) {
251 }
catch(
const std::exception& ex) {
252 exceptionMessage = fmt::format(
"Communication exception - possible device error/misconfiguration. Original message '{}'", ex.what());
264 bool DataInputQueue::isClosed()
const {
268 void DataInputQueue::close() {
270 if(!running.exchange(
false))
return;
276 if((writingThread.get_id() != std::this_thread::get_id()) && writingThread.joinable()) writingThread.join();
282 DataInputQueue::~DataInputQueue() {
287 if(writingThread.joinable()) writingThread.join();
290 void DataInputQueue::setBlocking(
bool blocking) {
291 if(!running)
throw std::runtime_error(exceptionMessage.c_str());
292 queue.setBlocking(blocking);
295 bool DataInputQueue::getBlocking()
const {
296 if(!running)
throw std::runtime_error(exceptionMessage.c_str());
297 return queue.getBlocking();
300 void DataInputQueue::setMaxSize(
unsigned int maxSize) {
301 if(!running)
throw std::runtime_error(exceptionMessage.c_str());
302 queue.setMaxSize(maxSize);
305 unsigned int DataInputQueue::getMaxSize()
const {
306 if(!running)
throw std::runtime_error(exceptionMessage.c_str());
307 return queue.getMaxSize();
311 void DataInputQueue::setMaxDataSize(std::size_t maxSize) {
312 maxDataSize = maxSize;
315 std::size_t DataInputQueue::getMaxDataSize() {
319 std::string DataInputQueue::getName()
const {
323 void DataInputQueue::send(
const std::shared_ptr<RawBuffer>& rawMsg) {
324 if(!running)
throw std::runtime_error(exceptionMessage.c_str());
325 if(!rawMsg)
throw std::invalid_argument(
"Message passed is not valid (nullptr)");
328 if(rawMsg->data.size() > maxDataSize) {
329 throw std::runtime_error(fmt::format(
"Trying to send larger ({}B) message than XLinkIn maxDataSize ({}B)", rawMsg->data.size(), maxDataSize.load()));
332 if(!queue.push(rawMsg)) {
333 throw std::runtime_error(
"Underlying queue destructed");
336 void DataInputQueue::send(
const std::shared_ptr<ADatatype>& msg) {
337 if(!msg)
throw std::invalid_argument(
"Message passed is not valid (nullptr)");
338 send(msg->serialize());
345 bool DataInputQueue::send(
const std::shared_ptr<RawBuffer>& rawMsg, std::chrono::milliseconds timeout) {
346 if(!running)
throw std::runtime_error(exceptionMessage.c_str());
347 if(!rawMsg)
throw std::invalid_argument(
"Message passed is not valid (nullptr)");
350 if(rawMsg->data.size() > maxDataSize) {
351 throw std::runtime_error(fmt::format(
"Trying to send larger ({}B) message than XLinkIn maxDataSize ({}B)", rawMsg->data.size(), maxDataSize.load()));
354 return queue.tryWaitAndPush(rawMsg, timeout);
357 bool DataInputQueue::send(
const std::shared_ptr<ADatatype>& msg, std::chrono::milliseconds timeout) {
358 if(!msg)
throw std::invalid_argument(
"Message passed is not valid (nullptr)");
359 return send(msg->serialize(), timeout);
362 bool DataInputQueue::send(
const ADatatype& msg, std::chrono::milliseconds timeout) {