DataQueue.cpp
Go to the documentation of this file.
2 
3 // std
4 #include <chrono>
5 #include <iostream>
6 #include <memory>
7 
8 // project
15 
16 // shared
18 
19 // libraries
20 #include "utility/Logging.hpp"
21 #include "utility/spdlog-fmt.hpp"
22 
23 // Additions
24 #include "spdlog/fmt/bin_to_hex.h"
25 #include "spdlog/fmt/chrono.h"
26 
27 namespace dai {
28 
29 // DATA OUTPUT QUEUE
30 DataOutputQueue::DataOutputQueue(const std::shared_ptr<XLinkConnection> conn, const std::string& streamName, unsigned int maxSize, bool blocking)
31  : queue(maxSize, blocking), name(streamName) {
32  // Create stream first and then pass to thread
33  // Open stream with 1B write size (no writing will happen here)
34  XLinkStream stream(std::move(conn), name, 1);
35 
36  // Creates a thread which reads from connection into the queue
37  readingThread = std::thread([this, stream = std::move(stream)]() mutable {
38  std::uint64_t numPacketsRead = 0;
39  try {
40  while(running) {
41  // Blocking -- parse packet and gather timing information
42  auto packet = stream.readMove();
44  const auto t1Parse = std::chrono::steady_clock::now();
47  auto msgGrp = std::static_pointer_cast<MessageGroup>(data);
48  unsigned int size = msgGrp->getNumMessages();
49  std::vector<std::shared_ptr<ADatatype>> packets;
50  packets.reserve(size);
51  for(unsigned int i = 0; i < size; ++i) {
52  auto dpacket = stream.readMove();
53  packets.push_back(StreamMessageParser::parseMessageToADatatype(&dpacket));
54  }
55  auto rawMsgGrp = std::static_pointer_cast<RawMessageGroup>(data->getRaw());
56  for(auto& msg : rawMsgGrp->group) {
57  msgGrp->add(msg.first, packets[msg.second.index]);
58  }
59  }
60  const auto t2Parse = std::chrono::steady_clock::now();
61 
62  // Trace level debugging
64  std::vector<std::uint8_t> metadata;
66  data->getRaw()->serialize(metadata, type);
67  logger::trace("Received message from device ({}) - parsing time: {}, data size: {}, object type: {} object data: {}",
68  name,
69  std::chrono::duration_cast<std::chrono::microseconds>(t2Parse - t1Parse),
70  data->getRaw()->data.size(),
71  static_cast<std::int32_t>(type),
72  spdlog::to_hex(metadata));
73  }
74 
75  // Add 'data' to queue
76  if(!queue.push(data)) {
77  throw std::runtime_error(fmt::format("Underlying queue destructed"));
78  }
79 
80  // Increment numPacketsRead
81  numPacketsRead++;
82 
83  // Call callbacks
84  {
85  std::unique_lock<std::mutex> l(callbacksMtx);
86  for(const auto& kv : callbacks) {
87  const auto& callback = kv.second;
88  try {
89  callback(name, data);
90  } catch(const std::exception& ex) {
91  logger::error("Callback with id: {} throwed an exception: {}", kv.first, ex.what());
92  }
93  }
94  }
95  }
96 
97  } catch(const std::exception& ex) {
98  exceptionMessage = fmt::format("Communication exception - possible device error/misconfiguration. Original message '{}'", ex.what());
99  }
100 
101  // Close the queue
102  close();
103  });
104 }
105 
106 // This function is thread-unsafe. The idea of "isClosed" is ephemerial and
107 // since there is no mutex lock, its state is outdated and invalid even before
108 // the logical NOT in this function. This calculated boolean then continues to degrade
109 // in validity as it is returned by value to the caller
111  return !running;
112 }
113 
115  // Set reading thread to stop and allow to be closed only once
116  if(!running.exchange(false)) return;
117 
118  // Destroy queue
119  queue.destruct();
120 
121  // Then join thread
122  if((readingThread.get_id() != std::this_thread::get_id()) && readingThread.joinable()) readingThread.join();
123 
124  // Log
125  logger::debug("DataOutputQueue ({}) closed", name);
126 }
127 
129  // Close the queue first
130  close();
131 
132  // Then join thread
133  if(readingThread.joinable()) readingThread.join();
134 }
135 
136 void DataOutputQueue::setBlocking(bool blocking) {
137  if(!running) throw std::runtime_error(exceptionMessage.c_str());
138  queue.setBlocking(blocking);
139 }
140 
142  if(!running) throw std::runtime_error(exceptionMessage.c_str());
143  return queue.getBlocking();
144 }
145 
146 void DataOutputQueue::setMaxSize(unsigned int maxSize) {
147  if(!running) throw std::runtime_error(exceptionMessage.c_str());
148  queue.setMaxSize(maxSize);
149 }
150 
151 unsigned int DataOutputQueue::getMaxSize() const {
152  if(!running) throw std::runtime_error(exceptionMessage.c_str());
153  return queue.getMaxSize();
154 }
155 
156 std::string DataOutputQueue::getName() const {
157  return name;
158 }
159 
160 int DataOutputQueue::addCallback(std::function<void(std::string, std::shared_ptr<ADatatype>)> callback) {
161  // Lock first
162  std::unique_lock<std::mutex> l(callbacksMtx);
163 
164  // Get unique id
165  int id = uniqueCallbackId++;
166 
167  // move assign callback
168  callbacks[id] = std::move(callback);
169 
170  // return id assigned to the callback
171  return id;
172 }
173 
174 int DataOutputQueue::addCallback(std::function<void(std::shared_ptr<ADatatype>)> callback) {
175  // Create a wrapper
176  return addCallback([callback = std::move(callback)](std::string, std::shared_ptr<ADatatype> message) { callback(std::move(message)); });
177 }
178 
179 int DataOutputQueue::addCallback(std::function<void()> callback) {
180  // Create a wrapper
181  return addCallback([callback = std::move(callback)](std::string, std::shared_ptr<ADatatype>) { callback(); });
182 }
183 
184 bool DataOutputQueue::removeCallback(int callbackId) {
185  // Lock first
186  std::unique_lock<std::mutex> l(callbacksMtx);
187 
188  // If callback with id 'callbackId' doesn't exists, return false
189  if(callbacks.count(callbackId) == 0) return false;
190 
191  // Otherwise erase and return true
192  callbacks.erase(callbackId);
193  return true;
194 }
195 
196 // DATA INPUT QUEUE
198  const std::shared_ptr<XLinkConnection> conn, const std::string& streamName, unsigned int maxSize, bool blocking, std::size_t maxDataSize)
199  : queue(maxSize, blocking), name(streamName), maxDataSize(maxDataSize) {
200  // open stream with maxDataSize write size
202 
203  writingThread = std::thread([this, stream = std::move(stream)]() mutable {
204  std::uint64_t numPacketsSent = 0;
205  try {
206  while(running) {
207  // get data from queue
208  std::shared_ptr<RawBuffer> data;
209  if(!queue.waitAndPop(data)) {
210  continue;
211  }
212 
213  // serialize
214  auto t1Parse = std::chrono::steady_clock::now();
215  std::vector<std::vector<uint8_t>> serializedAux;
216  if(data->getType() == DatatypeEnum::MessageGroup) {
217  auto rawMsgGrp = std::dynamic_pointer_cast<RawMessageGroup>(data);
218  serializedAux.reserve(rawMsgGrp->group.size());
219  unsigned int index = 0;
220  for(auto& msg : rawMsgGrp->group) {
221  msg.second.index = index++;
222  serializedAux.push_back(StreamMessageParser::serializeMessage(msg.second.buffer));
223  }
224  }
225  auto serialized = StreamMessageParser::serializeMessage(data);
226  auto t2Parse = std::chrono::steady_clock::now();
227 
228  // Trace level debugging
230  std::vector<std::uint8_t> metadata;
232  data->serialize(metadata, type);
233  logger::trace("Sending message to device ({}) - serialize time: {}, data size: {}, object type: {} object data: {}",
234  name,
235  std::chrono::duration_cast<std::chrono::microseconds>(t2Parse - t1Parse),
236  data->data.size(),
237  type,
238  spdlog::to_hex(metadata));
239  }
240 
241  // Blocking
242  stream.write(serialized);
243  for(auto& msg : serializedAux) {
244  stream.write(msg);
245  }
246 
247  // Increment num packets sent
248  numPacketsSent++;
249  }
250 
251  } catch(const std::exception& ex) {
252  exceptionMessage = fmt::format("Communication exception - possible device error/misconfiguration. Original message '{}'", ex.what());
253  }
254 
255  // Close the queue
256  close();
257  });
258 }
259 
260 // This function is thread-unsafe. The idea of "isClosed" is ephemerial and
261 // since there is no mutex lock, its state is outdated and invalid even before
262 // the logical NOT in this function. This calculated boolean then continues to degrade
263 // in validity as it is returned by value to the caller
264 bool DataInputQueue::isClosed() const {
265  return !running;
266 }
267 
268 void DataInputQueue::close() {
269  // Set writing thread to stop and allow to be closed only once
270  if(!running.exchange(false)) return;
271 
272  // Destroy queue
273  queue.destruct();
274 
275  // Then join thread
276  if((writingThread.get_id() != std::this_thread::get_id()) && writingThread.joinable()) writingThread.join();
277 
278  // Log
279  logger::debug("DataInputQueue ({}) closed", name);
280 }
281 
282 DataInputQueue::~DataInputQueue() {
283  // Close the queue
284  close();
285 
286  // Then join thread
287  if(writingThread.joinable()) writingThread.join();
288 }
289 
290 void DataInputQueue::setBlocking(bool blocking) {
291  if(!running) throw std::runtime_error(exceptionMessage.c_str());
292  queue.setBlocking(blocking);
293 }
294 
295 bool DataInputQueue::getBlocking() const {
296  if(!running) throw std::runtime_error(exceptionMessage.c_str());
297  return queue.getBlocking();
298 }
299 
300 void DataInputQueue::setMaxSize(unsigned int maxSize) {
301  if(!running) throw std::runtime_error(exceptionMessage.c_str());
302  queue.setMaxSize(maxSize);
303 }
304 
305 unsigned int DataInputQueue::getMaxSize() const {
306  if(!running) throw std::runtime_error(exceptionMessage.c_str());
307  return queue.getMaxSize();
308 }
309 
310 // BUGBUG https://github.com/luxonis/depthai-core/issues/762
311 void DataInputQueue::setMaxDataSize(std::size_t maxSize) {
312  maxDataSize = maxSize;
313 }
314 
315 std::size_t DataInputQueue::getMaxDataSize() {
316  return maxDataSize;
317 }
318 
319 std::string DataInputQueue::getName() const {
320  return name;
321 }
322 
323 void DataInputQueue::send(const std::shared_ptr<RawBuffer>& rawMsg) {
324  if(!running) throw std::runtime_error(exceptionMessage.c_str());
325  if(!rawMsg) throw std::invalid_argument("Message passed is not valid (nullptr)");
326 
327  // Check if stream receiver has enough space for this message
328  if(rawMsg->data.size() > maxDataSize) {
329  throw std::runtime_error(fmt::format("Trying to send larger ({}B) message than XLinkIn maxDataSize ({}B)", rawMsg->data.size(), maxDataSize.load()));
330  }
331 
332  if(!queue.push(rawMsg)) {
333  throw std::runtime_error("Underlying queue destructed");
334  }
335 }
336 void DataInputQueue::send(const std::shared_ptr<ADatatype>& msg) {
337  if(!msg) throw std::invalid_argument("Message passed is not valid (nullptr)");
338  send(msg->serialize());
339 }
340 
341 void DataInputQueue::send(const ADatatype& msg) {
342  send(msg.serialize());
343 }
344 
345 bool DataInputQueue::send(const std::shared_ptr<RawBuffer>& rawMsg, std::chrono::milliseconds timeout) {
346  if(!running) throw std::runtime_error(exceptionMessage.c_str());
347  if(!rawMsg) throw std::invalid_argument("Message passed is not valid (nullptr)");
348 
349  // Check if stream receiver has enough space for this message
350  if(rawMsg->data.size() > maxDataSize) {
351  throw std::runtime_error(fmt::format("Trying to send larger ({}B) message than XLinkIn maxDataSize ({}B)", rawMsg->data.size(), maxDataSize.load()));
352  }
353 
354  return queue.tryWaitAndPush(rawMsg, timeout);
355 }
356 
357 bool DataInputQueue::send(const std::shared_ptr<ADatatype>& msg, std::chrono::milliseconds timeout) {
358  if(!msg) throw std::invalid_argument("Message passed is not valid (nullptr)");
359  return send(msg->serialize(), timeout);
360 }
361 
362 bool DataInputQueue::send(const ADatatype& msg, std::chrono::milliseconds timeout) {
363  return send(msg.serialize(), timeout);
364 }
365 
366 } // namespace dai
MessageGroup.hpp
DataQueue.hpp
dai::DataInputQueue::writingThread
std::thread writingThread
Definition: DataQueue.hpp:346
dai::DataOutputQueue::setBlocking
void setBlocking(bool blocking)
Definition: DataQueue.cpp:136
dai::StreamMessageParser::parseMessageToADatatype
static std::shared_ptr< ADatatype > parseMessageToADatatype(streamPacketDesc_t *const packet)
Definition: StreamMessageParser.cpp:349
dai::DatatypeEnum
DatatypeEnum
Definition: DatatypeEnum.hpp:7
dai::DataOutputQueue::close
void close()
Definition: DataQueue.cpp:114
dai::DataInputQueue::exceptionMessage
std::string exceptionMessage
Definition: DataQueue.hpp:348
dai::logger::get_level
spdlog::level::level_enum get_level()
Definition: Logging.hpp:48
dai::ADatatype
Abstract message.
Definition: ADatatype.hpp:11
dai::DataOutputQueue::DataOutputQueue
DataOutputQueue(const std::shared_ptr< XLinkConnection > conn, const std::string &streamName, unsigned int maxSize=16, bool blocking=true)
Definition: DataQueue.cpp:30
dai::DataInputQueue::queue
LockingQueue< std::shared_ptr< RawBuffer > > queue
Definition: DataQueue.hpp:345
dai::XLinkStream
Definition: XLinkStream.hpp:37
dai::logger::debug
void debug(const FormatString &fmt, Args &&...args)
Definition: Logging.hpp:72
dai::StreamMessageParser::serializeMessage
static std::vector< std::uint8_t > serializeMessage(const std::shared_ptr< const RawBuffer > &data)
Definition: StreamMessageParser.cpp:384
dai::DataOutputQueue::exceptionMessage
std::string exceptionMessage
Definition: DataQueue.hpp:31
dai::DataInputQueue::name
const std::string name
Definition: DataQueue.hpp:349
dai::DataInputQueue::maxDataSize
std::atomic< std::size_t > maxDataSize
Definition: DataQueue.hpp:350
DatatypeEnum.hpp
DAI_SPAN_NAMESPACE_NAME::detail::data
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.hpp:177
ADatatype.hpp
RawMessageGroup.hpp
dai::DataOutputQueue::addCallback
CallbackId addCallback(std::function< void(std::string, std::shared_ptr< ADatatype >)>)
Definition: DataQueue.cpp:160
dai::DataOutputQueue::readingThread
std::thread readingThread
Definition: DataQueue.hpp:29
dai::DataOutputQueue::~DataOutputQueue
~DataOutputQueue()
Definition: DataQueue.cpp:128
dai::DataOutputQueue::setMaxSize
void setMaxSize(unsigned int maxSize)
Definition: DataQueue.cpp:146
DAI_SPAN_NAMESPACE_NAME::detail::size
constexpr auto size(const C &c) -> decltype(c.size())
Definition: span.hpp:167
dai::XLinkStream::readMove
StreamPacketDesc readMove()
Definition: XLinkStream.cpp:125
dai::DataInputQueue::running
std::atomic< bool > running
Definition: DataQueue.hpp:347
StreamMessageParser.hpp
dai::DataInputQueue::close
void close()
Definition: DataQueue.cpp:268
dai::logger::error
void error(const FormatString &fmt, Args &&...args)
Definition: Logging.hpp:90
XLinkStream.hpp
dai::DataOutputQueue::callbacks
std::unordered_map< CallbackId, std::function< void(std::string, std::shared_ptr< ADatatype >)> > callbacks
Definition: DataQueue.hpp:34
nanorpc::core::type::id
std::uint64_t id
Definition: type.h:27
dai::DatatypeEnum::MessageGroup
@ MessageGroup
nanorpc::core::detail::pack::meta::type
type
Definition: pack_meta.h:26
dai::DataOutputQueue::getBlocking
bool getBlocking() const
Definition: DataQueue.cpp:141
dai::DataOutputQueue::name
const std::string name
Definition: DataQueue.hpp:32
dai::DataOutputQueue::isClosed
bool isClosed() const
Definition: DataQueue.cpp:110
dai::DataOutputQueue::uniqueCallbackId
CallbackId uniqueCallbackId
Definition: DataQueue.hpp:35
dai::DataOutputQueue::getMaxSize
unsigned int getMaxSize() const
Definition: DataQueue.cpp:151
dai::DataOutputQueue::getName
std::string getName() const
Definition: DataQueue.cpp:156
dai::device::XLINK_MESSAGE_METADATA_MAX_SIZE
constexpr static const int XLINK_MESSAGE_METADATA_MAX_SIZE
Definition: depthai-shared/include/depthai-shared/xlink/XLinkConstants.hpp:24
dai::XLinkStream::write
void write(const void *data, std::size_t size)
Definition: XLinkStream.cpp:86
dai::DataOutputQueue::removeCallback
bool removeCallback(CallbackId callbackId)
Definition: DataQueue.cpp:184
dai::logger::trace
void trace(const FormatString &fmt, Args &&...args)
Definition: Logging.hpp:66
spdlog-fmt.hpp
dai::DataOutputQueue::callbacksMtx
std::mutex callbacksMtx
Definition: DataQueue.hpp:33
dai::ADatatype::serialize
virtual std::shared_ptr< dai::RawBuffer > serialize() const =0
dai::DataOutputQueue::running
std::atomic< bool > running
Definition: DataQueue.hpp:30
dai::DataOutputQueue::queue
LockingQueue< std::shared_ptr< ADatatype > > queue
Definition: DataQueue.hpp:28
dai::DataInputQueue::DataInputQueue
DataInputQueue(const std::shared_ptr< XLinkConnection > conn, const std::string &streamName, unsigned int maxSize=16, bool blocking=true, std::size_t maxDataSize=device::XLINK_USB_BUFFER_MAX_SIZE)
Definition: DataQueue.cpp:197
Logging.hpp
dai
Definition: CameraExposureOffset.hpp:6


depthai
Author(s): Martin Peterlin
autogenerated on Sat Mar 22 2025 02:58:19