Program Listing for File Pipeline.hpp
↰ Return to documentation for file (include/depthai/pipeline/Pipeline.hpp)
// IWYU pragma: private, include "depthai/depthai.hpp"
#pragma once
// standard
#include <memory>
#include <type_traits>
#include <unordered_set>
#include <utility>
#include <vector>
// project
#include "AssetManager.hpp"
#include "DeviceNode.hpp"
#include "Node.hpp"
#include "depthai/device/CalibrationHandler.hpp"
#include "depthai/device/Device.hpp"
#include "depthai/openvino/OpenVINO.hpp"
#include "depthai/utility/AtomicBool.hpp"
// shared
#include "depthai/device/BoardConfig.hpp"
#include "depthai/pipeline/PipelineSchema.hpp"
#include "depthai/properties/GlobalProperties.hpp"
#include "depthai/utility/RecordReplay.hpp"
namespace dai {
namespace fs = std::filesystem;
class PipelineImpl : public std::enable_shared_from_this<PipelineImpl> {
friend class Pipeline;
friend class Node;
friend class DeviceBase;
public:
PipelineImpl(Pipeline& pipeline, bool createImplicitDevice = true) : assetManager("/pipeline/"), parent(pipeline) {
if(createImplicitDevice) {
defaultDevice = std::make_shared<Device>();
}
}
PipelineImpl(Pipeline& pipeline, std::shared_ptr<Device> device) : assetManager("/pipeline/"), parent(pipeline), defaultDevice{std::move(device)} {}
PipelineImpl(const PipelineImpl&) = delete;
PipelineImpl& operator=(const PipelineImpl&) = delete;
PipelineImpl(PipelineImpl&&) = delete;
PipelineImpl& operator=(PipelineImpl&&) = delete;
~PipelineImpl();
private:
// static functions
static bool isSamePipeline(const Node::Output& out, const Node::Input& in);
static bool canConnect(const Node::Output& out, const Node::Input& in);
// Functions
Node::Id getNextUniqueId();
PipelineSchema getPipelineSchema(SerializationType type = DEFAULT_SERIALIZATION_TYPE) const;
Device::Config getDeviceConfig() const;
void setCameraTuningBlobPath(const fs::path& path);
void setXLinkChunkSize(int sizeBytes);
GlobalProperties getGlobalProperties() const;
void setGlobalProperties(GlobalProperties globalProperties);
void setSippBufferSize(int sizeBytes);
void setSippDmaBufferSize(int sizeBytes);
void setBoardConfig(BoardConfig board);
BoardConfig getBoardConfig() const;
// Access to nodes
std::vector<std::shared_ptr<Node>> getAllNodes() const;
std::shared_ptr<Node> getNode(Node::Id id) const;
std::vector<std::shared_ptr<Node>> getSourceNodes();
void serialize(PipelineSchema& schema, Assets& assets, std::vector<std::uint8_t>& assetStorage, SerializationType type = DEFAULT_SERIALIZATION_TYPE) const;
nlohmann::json serializeToJson(bool includeAssets) const;
void remove(std::shared_ptr<Node> node);
std::vector<Node::Connection> getConnections() const;
std::vector<Node::ConnectionInternal> getConnectionsInternal() const;
void link(const Node::Output& out, const Node::Input& in);
void unlink(const Node::Output& out, const Node::Input& in);
void setCalibrationData(CalibrationHandler calibrationDataHandler);
bool isCalibrationDataAvailable() const;
CalibrationHandler getCalibrationData() const;
void setEepromData(std::optional<EepromData> eepromData);
std::optional<EepromData> getEepromData() const;
uint32_t getEepromId() const;
bool isHostOnly() const;
bool isDeviceOnly() const;
// Must be incremented and unique for each node
Node::Id latestId = 0;
// Pipeline asset manager
AssetManager assetManager;
// Optionally forced version
std::optional<OpenVINO::Version> forceRequiredOpenVINOVersion;
// Global pipeline properties
GlobalProperties globalProperties;
// // Optimized for adding, searching and removing connections
// using NodeMap = std::unordered_map<Node::Id, std::shared_ptr<Node>>;
// NodeMap nodeMap;
std::vector<std::shared_ptr<Node>> nodes;
// TODO(themarpe) - refactor, connections are now carried by nodes instead
using NodeConnectionMap = std::unordered_map<Node::Id, std::unordered_set<Node::ConnectionInternal, Node::ConnectionInternal::Hash>>;
// // Connection map, NodeId represents id of node connected TO (input)
// NodeConnectionMap nodeConnectionMap;
NodeConnectionMap getConnectionMap() const;
// Board configuration
BoardConfig board;
// Record and Replay
RecordConfig recordConfig;
bool enableHolisticRecordReplay = false;
std::unordered_map<std::string, std::filesystem::path> recordReplayFilenames;
bool removeRecordReplayFiles = true;
std::string defaultDeviceId;
// Output queues
std::vector<std::shared_ptr<MessageQueue>> outputQueues;
// parent
Pipeline& parent;
// is pipeline running
AtomicBool running{false};
// was pipeline built
AtomicBool isBuild{false};
// Add a mutex for any state change
std::mutex stateMtx;
// Calibration mutex
mutable std::mutex calibMtx;
// DeviceBase for hybrid pipelines
std::shared_ptr<Device> defaultDevice;
// Queue for tasks
LockingQueue<std::function<void()>> tasks;
void addTask(std::function<void()> task) {
tasks.push(std::move(task));
}
void processTasks(bool waitForTasks = false, double timeoutSeconds = -1.0) {
bool timeoutSet = timeoutSeconds >= 0.0;
if(waitForTasks) {
std::function<void()> task;
bool success;
if(timeoutSet) {
success = tasks.tryWaitAndPop(task, std::chrono::duration<double>(timeoutSeconds));
} else {
success = tasks.waitAndPop(task);
}
if(!success) {
return;
}
task();
}
// Regardless if we should wait or not, run all remaining tasks
while(!tasks.empty()) {
std::function<void()> task;
bool success = false;
success = tasks.tryPop(task);
if(!success) {
// No more tasks
break;
}
task();
}
}
template <typename N, typename... Args>
std::enable_if_t<std::is_base_of<DeviceNode, N>::value && !std::is_base_of<HostRunnable, N>::value, std::shared_ptr<N>> createNode(Args&&... args) {
// N is a subclass of DeviceNode
// return N::create(); // Specific create call for DeviceNode subclasses
if(defaultDevice == nullptr) {
throw std::runtime_error("Pipeline is host only, cannot create device node");
}
return N::create(defaultDevice, std::forward<Args>(args)...); // Specific create call for DeviceNode subclasses
}
template <typename N, typename... Args>
std::enable_if_t<std::is_base_of<DeviceNode, N>::value && std::is_base_of<HostRunnable, N>::value, std::shared_ptr<N>> createNode(Args&&... args) {
// N is a subclass of DeviceNode
// return N::create(); // Specific create call for DeviceNode subclasses
if(defaultDevice == nullptr) {
return N::create(std::forward<Args>(args)...); // Generic create call
} else {
return N::create(defaultDevice, std::forward<Args>(args)...); // Specific create call for DeviceNode subclasses
}
}
template <typename N, typename... Args>
std::enable_if_t<!std::is_base_of<DeviceNode, N>::value, std::shared_ptr<N>> createNode(Args&&... args) {
// N is not a subclass of DeviceNode
return N::create(std::forward<Args>(args)...); // Generic create call
}
// Template create function
template <class N, typename... Args>
std::shared_ptr<N> create(const std::shared_ptr<PipelineImpl>& itself, Args&&... args) {
(void)itself;
// Check that passed type 'N' is subclass of Node
static_assert(std::is_base_of<Node, N>::value, "Specified class is not a subclass of Node");
// Create and store the node in the map
auto node = createNode<N>(std::forward<Args>(args)...);
// std::shared_ptr<N> node = nullptr;
add(node);
// Return shared pointer to this node
return node;
}
// Add a node to nodeMap
void add(std::shared_ptr<Node> node);
// Run only host side, if any device nodes are present, error out
bool isRunning() const;
bool isBuilt() const;
void build();
void start();
void wait();
void stop();
void run();
// Reset connections
void resetConnections();
void disconnectXLinkHosts();
private:
// Resource
std::vector<uint8_t> loadResource(fs::path uri);
std::vector<uint8_t> loadResourceCwd(fs::path uri, fs::path cwd, bool moveAsset = false);
};
class Pipeline {
friend class PipelineImpl;
friend class Device;
std::shared_ptr<PipelineImpl> pimpl;
public:
PipelineImpl* impl() {
return pimpl.get();
}
const PipelineImpl* impl() const {
return pimpl.get();
}
std::vector<std::shared_ptr<Node>> getSourceNodes() {
return impl()->getSourceNodes();
}
explicit Pipeline(bool createImplicitDevice = true);
explicit Pipeline(std::shared_ptr<Device> device);
explicit Pipeline(std::shared_ptr<PipelineImpl> pimpl);
GlobalProperties getGlobalProperties() const {
return impl()->getGlobalProperties();
}
void setGlobalProperties(GlobalProperties globalProperties) {
impl()->setGlobalProperties(globalProperties);
}
PipelineSchema getPipelineSchema(SerializationType type = DEFAULT_SERIALIZATION_TYPE) const;
// void loadAssets(AssetManager& assetManager);
void serialize(PipelineSchema& schema, Assets& assets, std::vector<std::uint8_t>& assetStorage) const {
impl()->serialize(schema, assets, assetStorage);
}
nlohmann::json serializeToJson(bool includeAssests = true) const {
return impl()->serializeToJson(includeAssests);
}
template <class N, typename... Args>
std::shared_ptr<N> create(Args&&... args) {
return impl()->create<N>(pimpl, std::forward<Args>(args)...);
}
void add(std::shared_ptr<Node> node) {
impl()->add(node);
}
void remove(std::shared_ptr<Node> node) {
impl()->remove(node);
}
std::vector<std::shared_ptr<Node>> getAllNodes() const {
return impl()->getAllNodes();
}
std::shared_ptr<const Node> getNode(Node::Id id) const {
return impl()->getNode(id);
}
std::shared_ptr<Node> getNode(Node::Id id) {
return impl()->getNode(id);
}
std::vector<Node::Connection> getConnections() const {
return impl()->getConnections();
}
using NodeConnectionMap = PipelineImpl::NodeConnectionMap;
NodeConnectionMap getConnectionMap() const {
return impl()->getConnectionMap();
}
const AssetManager& getAssetManager() const {
return impl()->assetManager;
}
AssetManager& getAssetManager() {
return impl()->assetManager;
}
void setOpenVINOVersion(OpenVINO::Version version) {
impl()->forceRequiredOpenVINOVersion = version;
}
void setCalibrationData(CalibrationHandler calibrationDataHandler) {
impl()->setCalibrationData(calibrationDataHandler);
}
CalibrationHandler getCalibrationData() const {
return impl()->getCalibrationData();
}
bool isCalibrationDataAvailable() const {
return impl()->isCalibrationDataAvailable();
}
std::optional<EepromData> getEepromData() const {
return impl()->getEepromData();
}
void setEepromData(std::optional<EepromData> eepromData) {
impl()->setEepromData(eepromData);
}
uint32_t getEepromId() const {
return impl()->getEepromId();
}
void setCameraTuningBlobPath(const fs::path& path) {
impl()->setCameraTuningBlobPath(path);
}
void setXLinkChunkSize(int sizeBytes) {
impl()->setXLinkChunkSize(sizeBytes);
}
void setSippBufferSize(int sizeBytes) {
impl()->setSippBufferSize(sizeBytes);
}
void setSippDmaBufferSize(int sizeBytes) {
impl()->setSippDmaBufferSize(sizeBytes);
}
void setBoardConfig(BoardConfig board) {
impl()->setBoardConfig(board);
}
BoardConfig getBoardConfig() const {
return impl()->getBoardConfig();
}
Device::Config getDeviceConfig() const {
return impl()->getDeviceConfig();
}
bool isRunning() const {
return impl()->isRunning();
}
bool isBuilt() const {
return impl()->isBuilt();
}
void build() {
impl()->build();
}
void start() {
impl()->start();
}
void wait() {
impl()->wait();
}
void stop() {
impl()->stop();
}
void processTasks(bool waitForTasks = false, double timeoutSeconds = -1.0) {
impl()->processTasks(waitForTasks, timeoutSeconds);
}
void run() {
impl()->run();
}
/*
* @note In case of a host only pipeline, this function returns a nullptr
*/
std::shared_ptr<Device> getDefaultDevice() {
return impl()->defaultDevice;
}
void addTask(std::function<void()> task) {
impl()->addTask(std::move(task));
}
void enableHolisticRecord(const RecordConfig& config);
void enableHolisticReplay(const std::string& pathToRecording);
};
} // namespace dai