Program Listing for File MessageQueue.hpp
↰ Return to documentation for file (include/depthai/pipeline/MessageQueue.hpp)
#pragma once
// std
#include <memory>
#include <vector>
// project
#include "depthai/pipeline/datatype/ADatatype.hpp"
#include "depthai/utility/LockingQueue.hpp"
// shared
namespace dai {
class MessageQueue : public std::enable_shared_from_this<MessageQueue> {
public:
using CallbackId = int;
class QueueException : public std::runtime_error {
public:
explicit QueueException(const std::string& message) : std::runtime_error(message) {}
~QueueException() noexcept override;
};
private:
static constexpr auto CLOSED_QUEUE_MESSAGE = "MessageQueue was closed";
LockingQueue<std::shared_ptr<ADatatype>> queue;
std::string name;
public:
std::mutex callbacksMtx; // Only public for the Python bindings
std::unordered_map<CallbackId, std::function<void(std::string, std::shared_ptr<ADatatype>)>> callbacks; // Only public for the Python bindings
CallbackId uniqueCallbackId{0};
private:
void callCallbacks(std::shared_ptr<ADatatype> msg);
public:
// DataOutputQueue constructor
explicit MessageQueue(unsigned int maxSize = 16, bool blocking = true);
explicit MessageQueue(std::string name, unsigned int maxSize = 16, bool blocking = true);
MessageQueue(const MessageQueue& c)
: enable_shared_from_this(c), queue(c.queue), name(c.name), callbacks(c.callbacks), uniqueCallbackId(c.uniqueCallbackId){};
MessageQueue(MessageQueue&& m) noexcept
: enable_shared_from_this(m),
queue(std::move(m.queue)),
name(std::move(m.name)),
callbacks(std::move(m.callbacks)),
uniqueCallbackId(m.uniqueCallbackId){};
MessageQueue& operator=(const MessageQueue& c) {
queue = c.queue;
name = c.name;
callbacks = c.callbacks;
uniqueCallbackId = c.uniqueCallbackId;
return *this;
}
MessageQueue& operator=(MessageQueue&& m) noexcept {
queue = std::move(m.queue);
name = std::move(m.name);
callbacks = std::move(m.callbacks);
uniqueCallbackId = m.uniqueCallbackId;
return *this;
}
virtual ~MessageQueue();
std::string getName() const;
void setName(std::string name);
bool isClosed() const;
void close();
void setBlocking(bool blocking);
bool getBlocking() const;
void setMaxSize(unsigned int maxSize);
unsigned int getMaxSize() const;
unsigned int getSize() const;
unsigned int isFull() const;
CallbackId addCallback(std::function<void(std::string, std::shared_ptr<ADatatype>)>);
CallbackId addCallback(const std::function<void(std::shared_ptr<ADatatype>)>&);
CallbackId addCallback(const std::function<void()>& callback);
bool removeCallback(CallbackId callbackId);
template <class T>
bool has() {
if(queue.isDestroyed()) {
throw QueueException(CLOSED_QUEUE_MESSAGE);
}
std::shared_ptr<ADatatype> val = nullptr;
return queue.front(val) && dynamic_cast<T*>(val.get());
}
bool has() {
if(queue.isDestroyed()) {
throw QueueException(CLOSED_QUEUE_MESSAGE);
}
return !queue.empty();
}
template <class T>
std::shared_ptr<T> tryGet() {
if(queue.isDestroyed()) {
throw QueueException(CLOSED_QUEUE_MESSAGE);
}
std::shared_ptr<ADatatype> val = nullptr;
if(!queue.tryPop(val)) return nullptr;
return std::dynamic_pointer_cast<T>(val);
}
std::shared_ptr<ADatatype> tryGet() {
return tryGet<ADatatype>();
}
template <class T>
std::shared_ptr<T> get() {
std::shared_ptr<ADatatype> val = nullptr;
if(!queue.waitAndPop(val)) {
throw QueueException(CLOSED_QUEUE_MESSAGE);
}
return std::dynamic_pointer_cast<T>(val);
}
std::shared_ptr<ADatatype> get() {
return get<ADatatype>();
}
template <class T>
std::shared_ptr<T> front() {
if(queue.isDestroyed()) {
throw QueueException(CLOSED_QUEUE_MESSAGE);
}
std::shared_ptr<ADatatype> val = nullptr;
if(!queue.front(val)) return nullptr;
return std::dynamic_pointer_cast<T>(val);
}
std::shared_ptr<ADatatype> front() {
return front<ADatatype>();
}
template <class T, typename Rep, typename Period>
std::shared_ptr<T> get(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
if(queue.isDestroyed()) {
throw QueueException(CLOSED_QUEUE_MESSAGE);
}
std::shared_ptr<ADatatype> val = nullptr;
if(!queue.tryWaitAndPop(val, timeout)) {
hasTimedout = true;
// Check again after the timeout
if(queue.isDestroyed()) {
throw QueueException(CLOSED_QUEUE_MESSAGE);
}
return nullptr;
}
hasTimedout = false;
return std::dynamic_pointer_cast<T>(val);
}
template <typename Rep, typename Period>
std::shared_ptr<ADatatype> get(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
return get<ADatatype>(timeout, hasTimedout);
}
template <class T>
std::vector<std::shared_ptr<T>> tryGetAll() {
if(queue.isDestroyed()) {
throw QueueException(CLOSED_QUEUE_MESSAGE);
}
std::vector<std::shared_ptr<T>> messages;
queue.consumeAll([&messages](std::shared_ptr<ADatatype>& msg) {
// dynamic pointer cast may return nullptr
// in which case that message in vector will be nullptr
messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
});
return messages;
}
std::vector<std::shared_ptr<ADatatype>> tryGetAll() {
return tryGetAll<ADatatype>();
}
template <class T>
std::vector<std::shared_ptr<T>> getAll() {
std::vector<std::shared_ptr<T>> messages;
bool notDestructed = queue.waitAndConsumeAll([&messages](std::shared_ptr<ADatatype>& msg) {
// dynamic pointer cast may return nullptr
// in which case that message in vector will be nullptr
messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
});
if(!notDestructed) {
throw QueueException(CLOSED_QUEUE_MESSAGE);
}
return messages;
}
std::vector<std::shared_ptr<ADatatype>> getAll() {
return getAll<ADatatype>();
}
template <class T, typename Rep, typename Period>
std::vector<std::shared_ptr<T>> getAll(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
if(queue.isDestroyed()) {
throw QueueException(CLOSED_QUEUE_MESSAGE);
}
std::vector<std::shared_ptr<T>> messages;
hasTimedout = !queue.waitAndConsumeAll(
[&messages](std::shared_ptr<ADatatype>& msg) {
// dynamic pointer cast may return nullptr
// in which case that message in vector will be nullptr
messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
},
timeout);
return messages;
}
template <typename Rep, typename Period>
std::vector<std::shared_ptr<ADatatype>> getAll(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
return getAll<ADatatype>(timeout, hasTimedout);
}
void send(const std::shared_ptr<ADatatype>& msg);
bool send(const std::shared_ptr<ADatatype>& msg, std::chrono::milliseconds timeout);
bool send(const ADatatype& msg, std::chrono::milliseconds timeout);
bool trySend(const std::shared_ptr<ADatatype>& msg);
};
} // namespace dai