Program Listing for File Pipeline.hpp

Return to documentation for file (include/depthai/pipeline/Pipeline.hpp)

// IWYU pragma: private, include "depthai/depthai.hpp"
#pragma once

// standard
#include <memory>
#include <type_traits>
#include <unordered_set>
#include <utility>
#include <vector>

// project
#include "AssetManager.hpp"
#include "DeviceNode.hpp"
#include "Node.hpp"
#include "depthai/device/CalibrationHandler.hpp"
#include "depthai/device/Device.hpp"
#include "depthai/openvino/OpenVINO.hpp"
#include "depthai/utility/AtomicBool.hpp"

// shared
#include "depthai/device/BoardConfig.hpp"
#include "depthai/pipeline/PipelineSchema.hpp"
#include "depthai/properties/GlobalProperties.hpp"
#include "depthai/utility/RecordReplay.hpp"

namespace dai {

namespace fs = std::filesystem;

class PipelineImpl : public std::enable_shared_from_this<PipelineImpl> {
    friend class Pipeline;
    friend class Node;
    friend class DeviceBase;

   public:
    PipelineImpl(Pipeline& pipeline, bool createImplicitDevice = true) : assetManager("/pipeline/"), parent(pipeline) {
        if(createImplicitDevice) {
            defaultDevice = std::make_shared<Device>();
        }
    }
    PipelineImpl(Pipeline& pipeline, std::shared_ptr<Device> device) : assetManager("/pipeline/"), parent(pipeline), defaultDevice{std::move(device)} {}
    PipelineImpl(const PipelineImpl&) = delete;
    PipelineImpl& operator=(const PipelineImpl&) = delete;
    PipelineImpl(PipelineImpl&&) = delete;
    PipelineImpl& operator=(PipelineImpl&&) = delete;
    ~PipelineImpl();

   private:
    // static functions
    static bool isSamePipeline(const Node::Output& out, const Node::Input& in);
    static bool canConnect(const Node::Output& out, const Node::Input& in);

    // Functions
    Node::Id getNextUniqueId();
    PipelineSchema getPipelineSchema(SerializationType type = DEFAULT_SERIALIZATION_TYPE) const;
    Device::Config getDeviceConfig() const;
    void setCameraTuningBlobPath(const fs::path& path);
    void setXLinkChunkSize(int sizeBytes);
    GlobalProperties getGlobalProperties() const;
    void setGlobalProperties(GlobalProperties globalProperties);
    void setSippBufferSize(int sizeBytes);
    void setSippDmaBufferSize(int sizeBytes);
    void setBoardConfig(BoardConfig board);
    BoardConfig getBoardConfig() const;

    // Access to nodes
    std::vector<std::shared_ptr<Node>> getAllNodes() const;
    std::shared_ptr<Node> getNode(Node::Id id) const;
    std::vector<std::shared_ptr<Node>> getSourceNodes();

    void serialize(PipelineSchema& schema, Assets& assets, std::vector<std::uint8_t>& assetStorage, SerializationType type = DEFAULT_SERIALIZATION_TYPE) const;
    nlohmann::json serializeToJson(bool includeAssets) const;
    void remove(std::shared_ptr<Node> node);

    std::vector<Node::Connection> getConnections() const;
    std::vector<Node::ConnectionInternal> getConnectionsInternal() const;
    void link(const Node::Output& out, const Node::Input& in);
    void unlink(const Node::Output& out, const Node::Input& in);
    void setCalibrationData(CalibrationHandler calibrationDataHandler);
    bool isCalibrationDataAvailable() const;
    CalibrationHandler getCalibrationData() const;
    void setEepromData(std::optional<EepromData> eepromData);
    std::optional<EepromData> getEepromData() const;
    uint32_t getEepromId() const;
    bool isHostOnly() const;
    bool isDeviceOnly() const;

    // Must be incremented and unique for each node
    Node::Id latestId = 0;
    // Pipeline asset manager
    AssetManager assetManager;
    // Optionally forced version
    std::optional<OpenVINO::Version> forceRequiredOpenVINOVersion;
    // Global pipeline properties
    GlobalProperties globalProperties;
    // // Optimized for adding, searching and removing connections
    // using NodeMap = std::unordered_map<Node::Id, std::shared_ptr<Node>>;
    // NodeMap nodeMap;
    std::vector<std::shared_ptr<Node>> nodes;

    // TODO(themarpe) - refactor, connections are now carried by nodes instead
    using NodeConnectionMap = std::unordered_map<Node::Id, std::unordered_set<Node::ConnectionInternal, Node::ConnectionInternal::Hash>>;
    // // Connection map, NodeId represents id of node connected TO (input)
    // NodeConnectionMap nodeConnectionMap;
    NodeConnectionMap getConnectionMap() const;

    // Board configuration
    BoardConfig board;

    // Record and Replay
    RecordConfig recordConfig;
    bool enableHolisticRecordReplay = false;
    std::unordered_map<std::string, std::filesystem::path> recordReplayFilenames;
    bool removeRecordReplayFiles = true;
    std::string defaultDeviceId;

    // Output queues
    std::vector<std::shared_ptr<MessageQueue>> outputQueues;

    // parent
    Pipeline& parent;

    // is pipeline running
    AtomicBool running{false};

    // was pipeline built
    AtomicBool isBuild{false};

    // Add a mutex for any state change
    std::mutex stateMtx;

    // Calibration mutex
    mutable std::mutex calibMtx;

    // DeviceBase for hybrid pipelines
    std::shared_ptr<Device> defaultDevice;

    // Queue for tasks
    LockingQueue<std::function<void()>> tasks;

    void addTask(std::function<void()> task) {
        tasks.push(std::move(task));
    }

    void processTasks(bool waitForTasks = false, double timeoutSeconds = -1.0) {
        bool timeoutSet = timeoutSeconds >= 0.0;
        if(waitForTasks) {
            std::function<void()> task;
            bool success;
            if(timeoutSet) {
                success = tasks.tryWaitAndPop(task, std::chrono::duration<double>(timeoutSeconds));
            } else {
                success = tasks.waitAndPop(task);
            }
            if(!success) {
                return;
            }
            task();
        }
        // Regardless if we should wait or not, run all remaining tasks
        while(!tasks.empty()) {
            std::function<void()> task;
            bool success = false;
            success = tasks.tryPop(task);
            if(!success) {
                // No more tasks
                break;
            }
            task();
        }
    }

    template <typename N, typename... Args>
    std::enable_if_t<std::is_base_of<DeviceNode, N>::value && !std::is_base_of<HostRunnable, N>::value, std::shared_ptr<N>> createNode(Args&&... args) {
        // N is a subclass of DeviceNode
        // return N::create();  // Specific create call for DeviceNode subclasses
        if(defaultDevice == nullptr) {
            throw std::runtime_error("Pipeline is host only, cannot create device node");
        }
        return N::create(defaultDevice, std::forward<Args>(args)...);  // Specific create call for DeviceNode subclasses
    }

    template <typename N, typename... Args>
    std::enable_if_t<std::is_base_of<DeviceNode, N>::value && std::is_base_of<HostRunnable, N>::value, std::shared_ptr<N>> createNode(Args&&... args) {
        // N is a subclass of DeviceNode
        // return N::create();  // Specific create call for DeviceNode subclasses
        if(defaultDevice == nullptr) {
            return N::create(std::forward<Args>(args)...);  // Generic create call
        } else {
            return N::create(defaultDevice, std::forward<Args>(args)...);  // Specific create call for DeviceNode subclasses
        }
    }

    template <typename N, typename... Args>
    std::enable_if_t<!std::is_base_of<DeviceNode, N>::value, std::shared_ptr<N>> createNode(Args&&... args) {
        // N is not a subclass of DeviceNode
        return N::create(std::forward<Args>(args)...);  // Generic create call
    }

    // Template create function
    template <class N, typename... Args>
    std::shared_ptr<N> create(const std::shared_ptr<PipelineImpl>& itself, Args&&... args) {
        (void)itself;
        // Check that passed type 'N' is subclass of Node
        static_assert(std::is_base_of<Node, N>::value, "Specified class is not a subclass of Node");
        // Create and store the node in the map
        auto node = createNode<N>(std::forward<Args>(args)...);
        // std::shared_ptr<N> node = nullptr;
        add(node);
        // Return shared pointer to this node
        return node;
    }

    // Add a node to nodeMap
    void add(std::shared_ptr<Node> node);

    // Run only host side, if any device nodes are present, error out
    bool isRunning() const;
    bool isBuilt() const;
    void build();
    void start();
    void wait();
    void stop();
    void run();

    // Reset connections
    void resetConnections();
    void disconnectXLinkHosts();

   private:
    // Resource
    std::vector<uint8_t> loadResource(fs::path uri);
    std::vector<uint8_t> loadResourceCwd(fs::path uri, fs::path cwd, bool moveAsset = false);
};

class Pipeline {
    friend class PipelineImpl;
    friend class Device;

    std::shared_ptr<PipelineImpl> pimpl;

   public:
    PipelineImpl* impl() {
        return pimpl.get();
    }
    const PipelineImpl* impl() const {
        return pimpl.get();
    }

    std::vector<std::shared_ptr<Node>> getSourceNodes() {
        return impl()->getSourceNodes();
    }

    explicit Pipeline(bool createImplicitDevice = true);

    explicit Pipeline(std::shared_ptr<Device> device);

    explicit Pipeline(std::shared_ptr<PipelineImpl> pimpl);

    GlobalProperties getGlobalProperties() const {
        return impl()->getGlobalProperties();
    }

    void setGlobalProperties(GlobalProperties globalProperties) {
        impl()->setGlobalProperties(globalProperties);
    }

    PipelineSchema getPipelineSchema(SerializationType type = DEFAULT_SERIALIZATION_TYPE) const;

    // void loadAssets(AssetManager& assetManager);
    void serialize(PipelineSchema& schema, Assets& assets, std::vector<std::uint8_t>& assetStorage) const {
        impl()->serialize(schema, assets, assetStorage);
    }

    nlohmann::json serializeToJson(bool includeAssests = true) const {
        return impl()->serializeToJson(includeAssests);
    }

    template <class N, typename... Args>
    std::shared_ptr<N> create(Args&&... args) {
        return impl()->create<N>(pimpl, std::forward<Args>(args)...);
    }

    void add(std::shared_ptr<Node> node) {
        impl()->add(node);
    }

    void remove(std::shared_ptr<Node> node) {
        impl()->remove(node);
    }

    std::vector<std::shared_ptr<Node>> getAllNodes() const {
        return impl()->getAllNodes();
    }

    std::shared_ptr<const Node> getNode(Node::Id id) const {
        return impl()->getNode(id);
    }
    std::shared_ptr<Node> getNode(Node::Id id) {
        return impl()->getNode(id);
    }

    std::vector<Node::Connection> getConnections() const {
        return impl()->getConnections();
    }

    using NodeConnectionMap = PipelineImpl::NodeConnectionMap;
    NodeConnectionMap getConnectionMap() const {
        return impl()->getConnectionMap();
    }

    const AssetManager& getAssetManager() const {
        return impl()->assetManager;
    }

    AssetManager& getAssetManager() {
        return impl()->assetManager;
    }

    void setOpenVINOVersion(OpenVINO::Version version) {
        impl()->forceRequiredOpenVINOVersion = version;
    }

    void setCalibrationData(CalibrationHandler calibrationDataHandler) {
        impl()->setCalibrationData(calibrationDataHandler);
    }

    CalibrationHandler getCalibrationData() const {
        return impl()->getCalibrationData();
    }

    bool isCalibrationDataAvailable() const {
        return impl()->isCalibrationDataAvailable();
    }

    std::optional<EepromData> getEepromData() const {
        return impl()->getEepromData();
    }

    void setEepromData(std::optional<EepromData> eepromData) {
        impl()->setEepromData(eepromData);
    }

    uint32_t getEepromId() const {
        return impl()->getEepromId();
    }

    void setCameraTuningBlobPath(const fs::path& path) {
        impl()->setCameraTuningBlobPath(path);
    }

    void setXLinkChunkSize(int sizeBytes) {
        impl()->setXLinkChunkSize(sizeBytes);
    }

    void setSippBufferSize(int sizeBytes) {
        impl()->setSippBufferSize(sizeBytes);
    }

    void setSippDmaBufferSize(int sizeBytes) {
        impl()->setSippDmaBufferSize(sizeBytes);
    }

    void setBoardConfig(BoardConfig board) {
        impl()->setBoardConfig(board);
    }

    BoardConfig getBoardConfig() const {
        return impl()->getBoardConfig();
    }

    Device::Config getDeviceConfig() const {
        return impl()->getDeviceConfig();
    }

    bool isRunning() const {
        return impl()->isRunning();
    }

    bool isBuilt() const {
        return impl()->isBuilt();
    }

    void build() {
        impl()->build();
    }
    void start() {
        impl()->start();
    }
    void wait() {
        impl()->wait();
    }
    void stop() {
        impl()->stop();
    }
    void processTasks(bool waitForTasks = false, double timeoutSeconds = -1.0) {
        impl()->processTasks(waitForTasks, timeoutSeconds);
    }
    void run() {
        impl()->run();
    }
    /*
     * @note In case of a host only pipeline, this function returns a nullptr
     */
    std::shared_ptr<Device> getDefaultDevice() {
        return impl()->defaultDevice;
    }

    void addTask(std::function<void()> task) {
        impl()->addTask(std::move(task));
    }

    void enableHolisticRecord(const RecordConfig& config);
    void enableHolisticReplay(const std::string& pathToRecording);
};

}  // namespace dai