DataQueue.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 // std
4 #include <atomic>
5 #include <memory>
6 #include <vector>
7 
8 // project
12 
13 // shared
16 
17 namespace dai {
18 
23  public:
25  using CallbackId = int;
26 
27  private:
29  std::thread readingThread;
30  std::atomic<bool> running{true};
31  std::string exceptionMessage{""};
32  const std::string name{""};
33  std::mutex callbacksMtx;
34  std::unordered_map<CallbackId, std::function<void(std::string, std::shared_ptr<ADatatype>)>> callbacks;
36 
37  // const std::chrono::milliseconds READ_TIMEOUT{500};
38 
39  public:
40  // DataOutputQueue constructor
41  DataOutputQueue(const std::shared_ptr<XLinkConnection> conn, const std::string& streamName, unsigned int maxSize = 16, bool blocking = true);
43 
51  bool isClosed() const;
52 
56  void close();
57 
63  void setBlocking(bool blocking);
64 
70  bool getBlocking() const;
71 
77  void setMaxSize(unsigned int maxSize);
78 
84  unsigned int getMaxSize() const;
85 
91  std::string getName() const;
92 
99  CallbackId addCallback(std::function<void(std::string, std::shared_ptr<ADatatype>)>);
100 
107  CallbackId addCallback(std::function<void(std::shared_ptr<ADatatype>)>);
108 
115  CallbackId addCallback(std::function<void()> callback);
116 
123  bool removeCallback(CallbackId callbackId);
124 
129  template <class T>
130  bool has() {
131  if(!running) throw std::runtime_error(exceptionMessage.c_str());
132  std::shared_ptr<ADatatype> val = nullptr;
133  if(queue.front(val) && dynamic_cast<T*>(val.get())) {
134  return true;
135  }
136  return false;
137  }
138 
143  bool has() {
144  if(!running) throw std::runtime_error(exceptionMessage.c_str());
145  return !queue.empty();
146  }
147 
153  template <class T>
154  std::shared_ptr<T> tryGet() {
155  if(!running) throw std::runtime_error(exceptionMessage.c_str());
156  std::shared_ptr<ADatatype> val = nullptr;
157  if(!queue.tryPop(val)) return nullptr;
158  return std::dynamic_pointer_cast<T>(val);
159  }
160 
166  std::shared_ptr<ADatatype> tryGet() {
167  return tryGet<ADatatype>();
168  }
169 
175  template <class T>
176  std::shared_ptr<T> get() {
177  if(!running) throw std::runtime_error(exceptionMessage.c_str());
178  std::shared_ptr<ADatatype> val = nullptr;
179  if(!queue.waitAndPop(val)) {
180  throw std::runtime_error(exceptionMessage.c_str());
181  }
182  return std::dynamic_pointer_cast<T>(val);
183  }
184 
190  std::shared_ptr<ADatatype> get() {
191  return get<ADatatype>();
192  }
193 
199  template <class T>
200  std::shared_ptr<T> front() {
201  if(!running) throw std::runtime_error(exceptionMessage.c_str());
202  std::shared_ptr<ADatatype> val = nullptr;
203  if(!queue.front(val)) return nullptr;
204  return std::dynamic_pointer_cast<T>(val);
205  }
206 
212  std::shared_ptr<ADatatype> front() {
213  return front<ADatatype>();
214  }
215 
223  template <class T, typename Rep, typename Period>
224  std::shared_ptr<T> get(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
225  if(!running) throw std::runtime_error(exceptionMessage.c_str());
226  std::shared_ptr<ADatatype> val = nullptr;
227  if(!queue.tryWaitAndPop(val, timeout)) {
228  hasTimedout = true;
229  return nullptr;
230  }
231  hasTimedout = false;
232  return std::dynamic_pointer_cast<T>(val);
233  }
234 
242  template <typename Rep, typename Period>
243  std::shared_ptr<ADatatype> get(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
244  return get<ADatatype>(timeout, hasTimedout);
245  }
246 
252  template <class T>
253  std::vector<std::shared_ptr<T>> tryGetAll() {
254  if(!running) throw std::runtime_error(exceptionMessage.c_str());
255 
256  std::vector<std::shared_ptr<T>> messages;
257  queue.consumeAll([&messages](std::shared_ptr<ADatatype>& msg) {
258  // dynamic pointer cast may return nullptr
259  // in which case that message in vector will be nullptr
260  messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
261  });
262 
263  return messages;
264  }
265 
271  std::vector<std::shared_ptr<ADatatype>> tryGetAll() {
272  return tryGetAll<ADatatype>();
273  }
274 
281  template <class T>
282  std::vector<std::shared_ptr<T>> getAll() {
283  if(!running) throw std::runtime_error(exceptionMessage.c_str());
284 
285  std::vector<std::shared_ptr<T>> messages;
286  queue.waitAndConsumeAll([&messages](std::shared_ptr<ADatatype>& msg) {
287  // dynamic pointer cast may return nullptr
288  // in which case that message in vector will be nullptr
289  messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
290  });
291 
292  return messages;
293  }
294 
301  std::vector<std::shared_ptr<ADatatype>> getAll() {
302  return getAll<ADatatype>();
303  }
304 
312  template <class T, typename Rep, typename Period>
313  std::vector<std::shared_ptr<T>> getAll(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
314  if(!running) throw std::runtime_error(exceptionMessage.c_str());
315 
316  std::vector<std::shared_ptr<T>> messages;
317  hasTimedout = !queue.waitAndConsumeAll(
318  [&messages](std::shared_ptr<ADatatype>& msg) {
319  // dynamic pointer cast may return nullptr
320  // in which case that message in vector will be nullptr
321  messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
322  },
323  timeout);
324 
325  return messages;
326  }
327 
335  template <typename Rep, typename Period>
336  std::vector<std::shared_ptr<ADatatype>> getAll(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
337  return getAll<ADatatype>(timeout, hasTimedout);
338  }
339 };
340 
346  std::thread writingThread;
347  std::atomic<bool> running{true};
348  std::string exceptionMessage;
349  const std::string name;
351 
352  public:
353  DataInputQueue(const std::shared_ptr<XLinkConnection> conn,
354  const std::string& streamName,
355  unsigned int maxSize = 16,
356  bool blocking = true,
358  ~DataInputQueue();
359 
367  bool isClosed() const;
368 
372  void close();
373 
379  void setMaxDataSize(std::size_t maxSize);
380 
386  std::size_t getMaxDataSize();
387 
393  void setBlocking(bool blocking);
394 
400  bool getBlocking() const;
401 
407  void setMaxSize(unsigned int maxSize);
408 
414  unsigned int getMaxSize() const;
415 
421  std::string getName() const;
422 
428  void send(const std::shared_ptr<RawBuffer>& rawMsg);
429 
435  void send(const std::shared_ptr<ADatatype>& msg);
436 
442  void send(const ADatatype& msg);
443 
451  bool send(const std::shared_ptr<RawBuffer>& rawMsg, std::chrono::milliseconds timeout);
452 
460  bool send(const std::shared_ptr<ADatatype>& msg, std::chrono::milliseconds timeout);
461 
469  bool send(const ADatatype& msg, std::chrono::milliseconds timeout);
470 };
471 
472 } // namespace dai
dai::DataInputQueue::writingThread
std::thread writingThread
Definition: DataQueue.hpp:346
dai::DataOutputQueue::has
bool has()
Definition: DataQueue.hpp:130
dai::DataInputQueue::setMaxDataSize
void setMaxDataSize(std::size_t maxSize)
Definition: DataQueue.cpp:311
dai::DataOutputQueue::setBlocking
void setBlocking(bool blocking)
Definition: DataQueue.cpp:136
LockingQueue.hpp
dai::DataOutputQueue::tryGetAll
std::vector< std::shared_ptr< T > > tryGetAll()
Definition: DataQueue.hpp:253
dai::DataOutputQueue::tryGet
std::shared_ptr< ADatatype > tryGet()
Definition: DataQueue.hpp:166
dai::DataInputQueue::~DataInputQueue
~DataInputQueue()
Definition: DataQueue.cpp:282
dai::DataOutputQueue::close
void close()
Definition: DataQueue.cpp:114
dai::DataInputQueue::getMaxSize
unsigned int getMaxSize() const
Definition: DataQueue.cpp:305
dai::DataInputQueue::exceptionMessage
std::string exceptionMessage
Definition: DataQueue.hpp:348
dai::ADatatype
Abstract message.
Definition: ADatatype.hpp:11
dai::DataOutputQueue::DataOutputQueue
DataOutputQueue(const std::shared_ptr< XLinkConnection > conn, const std::string &streamName, unsigned int maxSize=16, bool blocking=true)
Definition: DataQueue.cpp:30
dai::DataInputQueue::send
void send(const std::shared_ptr< RawBuffer > &rawMsg)
Definition: DataQueue.cpp:323
dai::DataOutputQueue
Definition: DataQueue.hpp:22
dai::DataInputQueue::queue
LockingQueue< std::shared_ptr< RawBuffer > > queue
Definition: DataQueue.hpp:345
dai::DataOutputQueue::front
std::shared_ptr< ADatatype > front()
Definition: DataQueue.hpp:212
dai::DataOutputQueue::exceptionMessage
std::string exceptionMessage
Definition: DataQueue.hpp:31
dai::DataInputQueue::name
const std::string name
Definition: DataQueue.hpp:349
dai::DataInputQueue::maxDataSize
std::atomic< std::size_t > maxDataSize
Definition: DataQueue.hpp:350
dai::DataOutputQueue::get
std::shared_ptr< ADatatype > get()
Definition: DataQueue.hpp:190
ADatatype.hpp
dai::DataOutputQueue::addCallback
CallbackId addCallback(std::function< void(std::string, std::shared_ptr< ADatatype >)>)
Definition: DataQueue.cpp:160
dai::DataOutputQueue::readingThread
std::thread readingThread
Definition: DataQueue.hpp:29
dai::DataInputQueue::setBlocking
void setBlocking(bool blocking)
Definition: DataQueue.cpp:290
dai::DataOutputQueue::~DataOutputQueue
~DataOutputQueue()
Definition: DataQueue.cpp:128
dai::DataOutputQueue::setMaxSize
void setMaxSize(unsigned int maxSize)
Definition: DataQueue.cpp:146
dai::device::XLINK_USB_BUFFER_MAX_SIZE
constexpr static std::uint32_t XLINK_USB_BUFFER_MAX_SIZE
Definition: depthai-shared/include/depthai-shared/xlink/XLinkConstants.hpp:17
dai::DataOutputQueue::front
std::shared_ptr< T > front()
Definition: DataQueue.hpp:200
dai::DataInputQueue::running
std::atomic< bool > running
Definition: DataQueue.hpp:347
XLinkConnection.hpp
RawBuffer.hpp
dai::LockingQueue
Definition: LockingQueue.hpp:12
dai::DataInputQueue::close
void close()
Definition: DataQueue.cpp:268
dai::DataOutputQueue::tryGet
std::shared_ptr< T > tryGet()
Definition: DataQueue.hpp:154
dai::DataOutputQueue::callbacks
std::unordered_map< CallbackId, std::function< void(std::string, std::shared_ptr< ADatatype >)> > callbacks
Definition: DataQueue.hpp:34
dai::DataOutputQueue::get
std::shared_ptr< T > get(std::chrono::duration< Rep, Period > timeout, bool &hasTimedout)
Definition: DataQueue.hpp:224
dai::DataOutputQueue::tryGetAll
std::vector< std::shared_ptr< ADatatype > > tryGetAll()
Definition: DataQueue.hpp:271
dai::DataOutputQueue::getBlocking
bool getBlocking() const
Definition: DataQueue.cpp:141
dai::DataOutputQueue::name
const std::string name
Definition: DataQueue.hpp:32
dai::DataOutputQueue::isClosed
bool isClosed() const
Definition: DataQueue.cpp:110
dai::DataInputQueue::getBlocking
bool getBlocking() const
Definition: DataQueue.cpp:295
dai::DataOutputQueue::uniqueCallbackId
CallbackId uniqueCallbackId
Definition: DataQueue.hpp:35
dai::DataOutputQueue::getMaxSize
unsigned int getMaxSize() const
Definition: DataQueue.cpp:151
dai::DataOutputQueue::getName
std::string getName() const
Definition: DataQueue.cpp:156
dai::DataOutputQueue::getAll
std::vector< std::shared_ptr< T > > getAll()
Definition: DataQueue.hpp:282
dai::DataInputQueue::getMaxDataSize
std::size_t getMaxDataSize()
Definition: DataQueue.cpp:315
dai::DataOutputQueue::getAll
std::vector< std::shared_ptr< ADatatype > > getAll()
Definition: DataQueue.hpp:301
dai::DataOutputQueue::removeCallback
bool removeCallback(CallbackId callbackId)
Definition: DataQueue.cpp:184
dai::DataOutputQueue::get
std::shared_ptr< ADatatype > get(std::chrono::duration< Rep, Period > timeout, bool &hasTimedout)
Definition: DataQueue.hpp:243
dai::DataOutputQueue::has
bool has()
Definition: DataQueue.hpp:143
dai::DataInputQueue::setMaxSize
void setMaxSize(unsigned int maxSize)
Definition: DataQueue.cpp:300
dai::DataOutputQueue::callbacksMtx
std::mutex callbacksMtx
Definition: DataQueue.hpp:33
dai::DataInputQueue::isClosed
bool isClosed() const
Definition: DataQueue.cpp:264
dai::DataOutputQueue::getAll
std::vector< std::shared_ptr< ADatatype > > getAll(std::chrono::duration< Rep, Period > timeout, bool &hasTimedout)
Definition: DataQueue.hpp:336
dai::DataOutputQueue::running
std::atomic< bool > running
Definition: DataQueue.hpp:30
dai::DataOutputQueue::queue
LockingQueue< std::shared_ptr< ADatatype > > queue
Definition: DataQueue.hpp:28
dai::DataOutputQueue::get
std::shared_ptr< T > get()
Definition: DataQueue.hpp:176
dai::DataInputQueue::DataInputQueue
DataInputQueue(const std::shared_ptr< XLinkConnection > conn, const std::string &streamName, unsigned int maxSize=16, bool blocking=true, std::size_t maxDataSize=device::XLINK_USB_BUFFER_MAX_SIZE)
Definition: DataQueue.cpp:197
dai::DataInputQueue::getName
std::string getName() const
Definition: DataQueue.cpp:319
dai::DataOutputQueue::CallbackId
int CallbackId
Alias for callback id.
Definition: DataQueue.hpp:25
dai
Definition: CameraExposureOffset.hpp:6
dai::DataOutputQueue::getAll
std::vector< std::shared_ptr< T > > getAll(std::chrono::duration< Rep, Period > timeout, bool &hasTimedout)
Definition: DataQueue.hpp:313
dai::DataInputQueue
Definition: DataQueue.hpp:344


depthai
Author(s): Martin Peterlin
autogenerated on Sat Mar 22 2025 02:58:19