Program Listing for File MessageQueue.hpp

Return to documentation for file (include/depthai/pipeline/MessageQueue.hpp)

#pragma once

// std
#include <memory>
#include <vector>

// project
#include "depthai/pipeline/datatype/ADatatype.hpp"
#include "depthai/utility/LockingQueue.hpp"

// shared
namespace dai {

class MessageQueue : public std::enable_shared_from_this<MessageQueue> {
   public:
    using CallbackId = int;

    class QueueException : public std::runtime_error {
       public:
        explicit QueueException(const std::string& message) : std::runtime_error(message) {}
        ~QueueException() noexcept override;
    };

   private:
    static constexpr auto CLOSED_QUEUE_MESSAGE = "MessageQueue was closed";
    LockingQueue<std::shared_ptr<ADatatype>> queue;
    std::string name;

   public:
    std::mutex callbacksMtx;                                                                                 // Only public for the Python bindings
    std::unordered_map<CallbackId, std::function<void(std::string, std::shared_ptr<ADatatype>)>> callbacks;  // Only public for the Python bindings
    CallbackId uniqueCallbackId{0};

   private:
    void callCallbacks(std::shared_ptr<ADatatype> msg);

   public:
    // DataOutputQueue constructor
    explicit MessageQueue(unsigned int maxSize = 16, bool blocking = true);
    explicit MessageQueue(std::string name, unsigned int maxSize = 16, bool blocking = true);

    MessageQueue(const MessageQueue& c)
        : enable_shared_from_this(c), queue(c.queue), name(c.name), callbacks(c.callbacks), uniqueCallbackId(c.uniqueCallbackId){};
    MessageQueue(MessageQueue&& m) noexcept
        : enable_shared_from_this(m),
          queue(std::move(m.queue)),
          name(std::move(m.name)),
          callbacks(std::move(m.callbacks)),
          uniqueCallbackId(m.uniqueCallbackId){};

    MessageQueue& operator=(const MessageQueue& c) {
        queue = c.queue;
        name = c.name;
        callbacks = c.callbacks;
        uniqueCallbackId = c.uniqueCallbackId;
        return *this;
    }

    MessageQueue& operator=(MessageQueue&& m) noexcept {
        queue = std::move(m.queue);
        name = std::move(m.name);
        callbacks = std::move(m.callbacks);
        uniqueCallbackId = m.uniqueCallbackId;
        return *this;
    }

    virtual ~MessageQueue();

    std::string getName() const;

    void setName(std::string name);

    bool isClosed() const;

    void close();

    void setBlocking(bool blocking);

    bool getBlocking() const;

    void setMaxSize(unsigned int maxSize);

    unsigned int getMaxSize() const;

    unsigned int getSize() const;

    unsigned int isFull() const;

    CallbackId addCallback(std::function<void(std::string, std::shared_ptr<ADatatype>)>);

    CallbackId addCallback(const std::function<void(std::shared_ptr<ADatatype>)>&);

    CallbackId addCallback(const std::function<void()>& callback);

    bool removeCallback(CallbackId callbackId);

    template <class T>
    bool has() {
        if(queue.isDestroyed()) {
            throw QueueException(CLOSED_QUEUE_MESSAGE);
        }
        std::shared_ptr<ADatatype> val = nullptr;
        return queue.front(val) && dynamic_cast<T*>(val.get());
    }

    bool has() {
        if(queue.isDestroyed()) {
            throw QueueException(CLOSED_QUEUE_MESSAGE);
        }
        return !queue.empty();
    }

    template <class T>
    std::shared_ptr<T> tryGet() {
        if(queue.isDestroyed()) {
            throw QueueException(CLOSED_QUEUE_MESSAGE);
        }
        std::shared_ptr<ADatatype> val = nullptr;
        if(!queue.tryPop(val)) return nullptr;
        return std::dynamic_pointer_cast<T>(val);
    }

    std::shared_ptr<ADatatype> tryGet() {
        return tryGet<ADatatype>();
    }

    template <class T>
    std::shared_ptr<T> get() {
        std::shared_ptr<ADatatype> val = nullptr;
        if(!queue.waitAndPop(val)) {
            throw QueueException(CLOSED_QUEUE_MESSAGE);
        }
        return std::dynamic_pointer_cast<T>(val);
    }

    std::shared_ptr<ADatatype> get() {
        return get<ADatatype>();
    }

    template <class T>
    std::shared_ptr<T> front() {
        if(queue.isDestroyed()) {
            throw QueueException(CLOSED_QUEUE_MESSAGE);
        }
        std::shared_ptr<ADatatype> val = nullptr;
        if(!queue.front(val)) return nullptr;
        return std::dynamic_pointer_cast<T>(val);
    }

    std::shared_ptr<ADatatype> front() {
        return front<ADatatype>();
    }

    template <class T, typename Rep, typename Period>
    std::shared_ptr<T> get(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
        if(queue.isDestroyed()) {
            throw QueueException(CLOSED_QUEUE_MESSAGE);
        }
        std::shared_ptr<ADatatype> val = nullptr;
        if(!queue.tryWaitAndPop(val, timeout)) {
            hasTimedout = true;
            // Check again after the timeout
            if(queue.isDestroyed()) {
                throw QueueException(CLOSED_QUEUE_MESSAGE);
            }
            return nullptr;
        }
        hasTimedout = false;
        return std::dynamic_pointer_cast<T>(val);
    }

    template <typename Rep, typename Period>
    std::shared_ptr<ADatatype> get(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
        return get<ADatatype>(timeout, hasTimedout);
    }

    template <class T>
    std::vector<std::shared_ptr<T>> tryGetAll() {
        if(queue.isDestroyed()) {
            throw QueueException(CLOSED_QUEUE_MESSAGE);
        }
        std::vector<std::shared_ptr<T>> messages;
        queue.consumeAll([&messages](std::shared_ptr<ADatatype>& msg) {
            // dynamic pointer cast may return nullptr
            // in which case that message in vector will be nullptr
            messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
        });
        return messages;
    }

    std::vector<std::shared_ptr<ADatatype>> tryGetAll() {
        return tryGetAll<ADatatype>();
    }

    template <class T>
    std::vector<std::shared_ptr<T>> getAll() {
        std::vector<std::shared_ptr<T>> messages;
        bool notDestructed = queue.waitAndConsumeAll([&messages](std::shared_ptr<ADatatype>& msg) {
            // dynamic pointer cast may return nullptr
            // in which case that message in vector will be nullptr
            messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
        });
        if(!notDestructed) {
            throw QueueException(CLOSED_QUEUE_MESSAGE);
        }
        return messages;
    }

    std::vector<std::shared_ptr<ADatatype>> getAll() {
        return getAll<ADatatype>();
    }

    template <class T, typename Rep, typename Period>
    std::vector<std::shared_ptr<T>> getAll(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
        if(queue.isDestroyed()) {
            throw QueueException(CLOSED_QUEUE_MESSAGE);
        }
        std::vector<std::shared_ptr<T>> messages;
        hasTimedout = !queue.waitAndConsumeAll(
            [&messages](std::shared_ptr<ADatatype>& msg) {
                // dynamic pointer cast may return nullptr
                // in which case that message in vector will be nullptr
                messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
            },
            timeout);

        return messages;
    }

    template <typename Rep, typename Period>
    std::vector<std::shared_ptr<ADatatype>> getAll(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
        return getAll<ADatatype>(timeout, hasTimedout);
    }

    void send(const std::shared_ptr<ADatatype>& msg);

    bool send(const std::shared_ptr<ADatatype>& msg, std::chrono::milliseconds timeout);

    bool send(const ADatatype& msg, std::chrono::milliseconds timeout);

    bool trySend(const std::shared_ptr<ADatatype>& msg);
};

}  // namespace dai