Program Listing for File EventsManager.hpp

Return to documentation for file (include/depthai/utility/EventsManager.hpp)

#pragma once

#include <atomic>
#include <condition_variable>
#include <future>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

#include "depthai/pipeline/datatype/ADatatype.hpp"
#include "depthai/pipeline/datatype/EncodedFrame.hpp"
#include "depthai/pipeline/datatype/ImgDetections.hpp"
#include "depthai/pipeline/datatype/ImgFrame.hpp"
#include "depthai/pipeline/datatype/NNData.hpp"

namespace dai {
namespace proto {
namespace event {
class Event;
class FileUploadGroupResult;
enum PrepareFileUploadClass : int;
}  // namespace event
}  // namespace proto

namespace utility {

class FileData {
   public:
    FileData(std::string data, std::string fileName, std::string mimeType);
    explicit FileData(std::filesystem::path filePath, std::string fileName);
    explicit FileData(const std::shared_ptr<ImgFrame>& imgFrame, std::string fileName);
    explicit FileData(const std::shared_ptr<EncodedFrame>& encodedFrame, std::string fileName);
    // explicit FileData(const std::shared_ptr<NNData>& nnData, std::string fileName);
    explicit FileData(const std::shared_ptr<ImgDetections>& imgDetections, std::string fileName);
    bool toFile(const std::filesystem::path& inputPath);

   private:
    std::string mimeType;
    std::string fileName;
    std::string data;
    uint64_t size;
    std::string checksum;
    proto::event::PrepareFileUploadClass classification;
    friend class EventsManager;
};

class FileGroup {
   public:
    void addFile(std::string fileName, std::string data, std::string mimeType);
    void addFile(std::string fileName, std::filesystem::path filePath);
    void addFile(const std::optional<std::string>& fileName, const std::shared_ptr<ImgFrame>& imgFrame);
    void addFile(const std::optional<std::string>& fileName, const std::shared_ptr<EncodedFrame>& encodedFrame);
    // void addFile(std::string fileName, const std::shared_ptr<NNData>& nnData);
    void addFile(const std::optional<std::string>& fileName, const std::shared_ptr<ImgDetections>& imgDetections);
    void addImageDetectionsPair(const std::optional<std::string>& fileName,
                                const std::shared_ptr<ImgFrame>& imgFrame,
                                const std::shared_ptr<ImgDetections>& imgDetections);
    void addImageDetectionsPair(const std::optional<std::string>& fileName,
                                const std::shared_ptr<EncodedFrame>& encodedFrame,
                                const std::shared_ptr<ImgDetections>& imgDetections);
    // void addImageNNDataPair(std::string fileName, const std::shared_ptr<ImgFrame>& imgFrame, const std::shared_ptr<NNData>& imgDetections);
    // void addImageNNDataPair(std::string fileName, const std::shared_ptr<EncodedFrame>& encodedFrame, const std::shared_ptr<NNData>& imgDetections);

   private:
    std::vector<std::shared_ptr<FileData>> fileData;
    friend class EventsManager;
};

class EventsManager {
   public:
    explicit EventsManager(bool uploadCachedOnStart = false);
    ~EventsManager();

    bool sendEvent(const std::string& name,
                   const std::vector<std::string>& tags = {},
                   const std::unordered_map<std::string, std::string>& extras = {},
                   const std::string& deviceSerialNo = "",
                   const std::vector<std::string>& associateFiles = {});
    bool sendSnap(const std::string& name,
                  const std::shared_ptr<FileGroup> fileGroup,
                  const std::vector<std::string>& tags = {},
                  const std::unordered_map<std::string, std::string>& extras = {},
                  const std::string& deviceSerialNo = "");
    bool sendSnap(const std::string& name,
                  const std::optional<std::string>& fileName,
                  const std::shared_ptr<ImgFrame> imgFrame,
                  const std::optional<std::shared_ptr<ImgDetections>>& imgDetections = std::nullopt,
                  const std::vector<std::string>& tags = {},
                  const std::unordered_map<std::string, std::string>& extras = {},
                  const std::string& deviceSerialNo = "");
    void setToken(const std::string& token);
    void setLogResponse(bool logResponse);
    void setVerifySsl(bool verifySsl);
    void setCacheDir(const std::string& cacheDir);
    void setCacheIfCannotSend(bool cacheIfCannotSend);

   private:
    struct SnapData {
        std::shared_ptr<proto::event::Event> event;
        std::shared_ptr<FileGroup> fileGroup;
    };

    struct UploadRetryPolicy {
        int maxAttempts = 10;
        float factor = 2.0f;
        std::chrono::milliseconds baseDelay{100};
    };

    bool fetchConfigurationLimits();
    void uploadFileBatch(std::deque<std::shared_ptr<SnapData>> inputSnapBatch);
    bool uploadGroup(std::shared_ptr<SnapData> snapData, dai::proto::event::FileUploadGroupResult prepareGroupResult);
    bool uploadFile(std::shared_ptr<FileData> fileData, std::string uploadUrl);
    void uploadEventBatch();
    bool validateEvent(const proto::event::Event& inputEvent);
    void cacheEvents();
    void cacheSnapData(std::deque<std::shared_ptr<SnapData>>& inputSnapBatch);
    void uploadCachedData();
    bool checkForCachedData();
    void clearCachedData(const std::filesystem::path& directory);

    std::string token;
    std::string url;
    std::string sourceAppId;
    std::string sourceAppIdentifier;
    float publishInterval;
    bool logResponse;
    bool verifySsl;
    std::string cacheDir;
    bool cacheIfCannotSend;
    std::unique_ptr<std::thread> uploadThread;
    std::deque<std::shared_ptr<proto::event::Event>> eventBuffer;
    std::deque<std::shared_ptr<SnapData>> snapBuffer;
    std::deque<std::future<void>> uploadFileBatchFutures;
    std::mutex eventBufferMutex;
    std::mutex snapBufferMutex;
    std::mutex stopThreadConditionMutex;
    std::atomic<bool> stopUploadThread;
    std::atomic<bool> configurationLimitsFetched;
    std::condition_variable eventBufferCondition;

    uint64_t maxFileSizeBytes;
    uint64_t remainingStorageBytes;
    uint64_t warningStorageBytes;
    uint64_t bytesPerHour;
    uint32_t uploadsPerHour;
    uint32_t maxGroupsPerBatch;
    uint32_t maxFilesPerGroup;
    uint32_t eventsPerHour;
    uint32_t snapsPerHour;
    uint32_t eventsPerRequest;

    UploadRetryPolicy uploadRetryPolicy;

    static constexpr int EVENT_BUFFER_MAX_SIZE = 300;

    static constexpr int EVENT_VALIDATION_NAME_LENGTH = 56;
    static constexpr int EVENT_VALIDATION_MAX_TAGS = 20;
    static constexpr int EVENT_VALIDATION_TAG_LENGTH = 56;
    static constexpr int EVENT_VALIDATION_MAX_EXTRAS = 25;
    static constexpr int EVENT_VALIDATION_EXTRA_KEY_LENGTH = 40;
    static constexpr int EVENT_VALIDATION_EXTRA_VALUE_LENGTH = 100;
    static constexpr int EVENT_VALIDATION_MAX_ASSOCIATE_FILES = 20;
};
}  // namespace utility
}  // namespace dai