Program Listing for File writer.hpp

Return to documentation for file (include/3rdparty/mcap/writer.hpp)

#pragma once

#include <cstdio>
#include <memory>
#include <string>
#include <unordered_set>
#include <vector>

#include "types.hpp"
#include "visibility.hpp"

// Forward declaration
#ifndef MCAP_COMPRESSION_NO_ZSTD
struct ZSTD_CCtx_s;
#endif

namespace mcap {

struct MCAP_PUBLIC McapWriterOptions {
    bool noChunkCRC = false;
    bool noAttachmentCRC = false;
    bool enableDataCRC = false;
    bool noSummaryCRC = false;
    bool noChunking = false;
    bool noMessageIndex = false;
    bool noSummary = false;
    uint64_t chunkSize = DefaultChunkSize;
    Compression compression = Compression::Zstd;
    CompressionLevel compressionLevel = CompressionLevel::Default;
    bool forceCompression = false;
    std::string profile;
    std::string library = "libmcap " MCAP_LIBRARY_VERSION;

    // The following options are less commonly used, providing more fine-grained
    // control of index records and the Summary section

    bool noRepeatedSchemas = false;
    bool noRepeatedChannels = false;
    bool noAttachmentIndex = false;
    bool noMetadataIndex = false;
    bool noChunkIndex = false;
    bool noStatistics = false;
    bool noSummaryOffsets = false;

    McapWriterOptions(const std::string_view profile) : profile(profile) {}
};

class MCAP_PUBLIC IWritable {
   public:
    bool crcEnabled = false;

    IWritable() noexcept;
    virtual ~IWritable() = default;

    void write(const std::byte* data, uint64_t size);
    virtual void end() = 0;
    virtual uint64_t size() const = 0;
    uint32_t crc();
    void resetCrc();

   protected:
    virtual void handleWrite(const std::byte* data, uint64_t size) = 0;

   private:
    uint32_t crc_;
};

class MCAP_PUBLIC FileWriter final : public IWritable {
   public:
    ~FileWriter() override;

    Status open(std::string_view filename);

    void handleWrite(const std::byte* data, uint64_t size) override;
    void end() override;
    uint64_t size() const override;

   private:
    std::FILE* file_ = nullptr;
    uint64_t size_ = 0;
};

class MCAP_PUBLIC StreamWriter final : public IWritable {
   public:
    StreamWriter(std::ostream& stream);

    void handleWrite(const std::byte* data, uint64_t size) override;
    void end() override;
    uint64_t size() const override;

   private:
    std::ostream& stream_;
    uint64_t size_ = 0;
};

class MCAP_PUBLIC IChunkWriter : public IWritable {
   public:
    virtual ~IChunkWriter() override = default;

    virtual void end() override = 0;
    virtual uint64_t size() const override = 0;
    virtual uint64_t compressedSize() const = 0;
    virtual bool empty() const = 0;
    void clear();
    virtual const std::byte* data() const = 0;
    virtual const std::byte* compressedData() const = 0;

   protected:
    virtual void handleClear() = 0;
};

class MCAP_PUBLIC BufferWriter final : public IChunkWriter {
   public:
    void handleWrite(const std::byte* data, uint64_t size) override;
    void end() override;
    uint64_t size() const override;
    uint64_t compressedSize() const override;
    bool empty() const override;
    void handleClear() override;
    const std::byte* data() const override;
    const std::byte* compressedData() const override;

   private:
    std::vector<std::byte> buffer_;
};

#ifndef MCAP_COMPRESSION_NO_LZ4
class MCAP_PUBLIC LZ4Writer final : public IChunkWriter {
   public:
    LZ4Writer(CompressionLevel compressionLevel, uint64_t chunkSize);

    void handleWrite(const std::byte* data, uint64_t size) override;
    void end() override;
    uint64_t size() const override;
    uint64_t compressedSize() const override;
    bool empty() const override;
    void handleClear() override;
    const std::byte* data() const override;
    const std::byte* compressedData() const override;

   private:
    std::vector<std::byte> uncompressedBuffer_;
    std::vector<std::byte> compressedBuffer_;
    CompressionLevel compressionLevel_;
};
#endif

#ifndef MCAP_COMPRESSION_NO_ZSTD
class MCAP_PUBLIC ZStdWriter final : public IChunkWriter {
   public:
    ZStdWriter(CompressionLevel compressionLevel, uint64_t chunkSize);
    ~ZStdWriter() override;

    void handleWrite(const std::byte* data, uint64_t size) override;
    void end() override;
    uint64_t size() const override;
    uint64_t compressedSize() const override;
    bool empty() const override;
    void handleClear() override;
    const std::byte* data() const override;
    const std::byte* compressedData() const override;

   private:
    std::vector<std::byte> uncompressedBuffer_;
    std::vector<std::byte> compressedBuffer_;
    ZSTD_CCtx_s* zstdContext_ = nullptr;
};
#endif

class MCAP_PUBLIC McapWriter final {
   public:
    ~McapWriter();

    Status open(std::string_view filename, const McapWriterOptions& options);

    void open(IWritable& writer, const McapWriterOptions& options);

    void open(std::ostream& stream, const McapWriterOptions& options);

    void close();

    void terminate();

    void addSchema(Schema& schema);

    void addChannel(Channel& channel);

    Status write(const Message& message);

    Status write(Attachment& attachment);

    Status write(const Metadata& metadata);

    const Statistics& statistics() const;

    IWritable* dataSink();

    void closeLastChunk();

    // The following static methods are used for serialization of records and
    // primitives to an output stream. They are not intended to be used directly
    // unless you are implementing a lower level writer or tests

    static void writeMagic(IWritable& output);

    static uint64_t write(IWritable& output, const Header& header);
    static uint64_t write(IWritable& output, const Footer& footer, bool crcEnabled);
    static uint64_t write(IWritable& output, const Schema& schema);
    static uint64_t write(IWritable& output, const Channel& channel);
    static uint64_t write(IWritable& output, const Message& message);
    static uint64_t write(IWritable& output, const Attachment& attachment);
    static uint64_t write(IWritable& output, const Metadata& metadata);
    static uint64_t write(IWritable& output, const Chunk& chunk);
    static uint64_t write(IWritable& output, const MessageIndex& index);
    static uint64_t write(IWritable& output, const ChunkIndex& index);
    static uint64_t write(IWritable& output, const AttachmentIndex& index);
    static uint64_t write(IWritable& output, const MetadataIndex& index);
    static uint64_t write(IWritable& output, const Statistics& stats);
    static uint64_t write(IWritable& output, const SummaryOffset& summaryOffset);
    static uint64_t write(IWritable& output, const DataEnd& dataEnd);
    static uint64_t write(IWritable& output, const Record& record);

    static void write(IWritable& output, const std::string_view str);
    static void write(IWritable& output, const ByteArray bytes);
    static void write(IWritable& output, OpCode value);
    static void write(IWritable& output, uint16_t value);
    static void write(IWritable& output, uint32_t value);
    static void write(IWritable& output, uint64_t value);
    static void write(IWritable& output, const std::byte* data, uint64_t size);
    static void write(IWritable& output, const KeyValueMap& map, uint32_t size = 0);

   private:
    McapWriterOptions options_{""};
    uint64_t chunkSize_ = DefaultChunkSize;
    IWritable* output_ = nullptr;
    std::unique_ptr<FileWriter> fileOutput_;
    std::unique_ptr<StreamWriter> streamOutput_;
    std::unique_ptr<BufferWriter> uncompressedChunk_;
#ifndef MCAP_COMPRESSION_NO_LZ4
    std::unique_ptr<LZ4Writer> lz4Chunk_;
#endif
#ifndef MCAP_COMPRESSION_NO_ZSTD
    std::unique_ptr<ZStdWriter> zstdChunk_;
#endif
    std::vector<Schema> schemas_;
    std::vector<Channel> channels_;
    std::vector<AttachmentIndex> attachmentIndex_;
    std::vector<MetadataIndex> metadataIndex_;
    std::vector<ChunkIndex> chunkIndex_;
    Statistics statistics_{};
    std::unordered_set<SchemaId> writtenSchemas_;
    std::unordered_map<ChannelId, MessageIndex> currentMessageIndex_;
    Timestamp currentChunkStart_ = MaxTime;
    Timestamp currentChunkEnd_ = 0;
    Compression compression_ = Compression::None;
    uint64_t uncompressedSize_ = 0;
    bool opened_ = false;

    IWritable& getOutput();
    IChunkWriter* getChunkWriter();
    void writeChunk(IWritable& output, IChunkWriter& chunkData);
};

}  // namespace mcap

#ifdef MCAP_IMPLEMENTATION
    #include "writer.inl"
#endif