Program Listing for File reader.hpp

Return to documentation for file (include/3rdparty/mcap/reader.hpp)

#pragma once

#include <cstdio>
#include <fstream>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "intervaltree.hpp"
#include "read_job_queue.hpp"
#include "types.hpp"
#include "visibility.hpp"

namespace mcap {

enum struct ReadSummaryMethod {
    NoFallbackScan,
    AllowFallbackScan,
    ForceScan,
};

struct MCAP_PUBLIC IReadable {
    virtual ~IReadable() = default;

    virtual uint64_t size() const = 0;
    virtual uint64_t read(std::byte** output, uint64_t offset, uint64_t size) = 0;
};

class MCAP_PUBLIC FileReader final : public IReadable {
   public:
    FileReader(std::FILE* file);

    uint64_t size() const override;
    uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;

   private:
    std::FILE* file_;
    std::vector<std::byte> buffer_;
    uint64_t size_;
    uint64_t position_;
};

class MCAP_PUBLIC FileStreamReader final : public IReadable {
   public:
    FileStreamReader(std::ifstream& stream);

    uint64_t size() const override;
    uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;

   private:
    std::ifstream& stream_;
    std::vector<std::byte> buffer_;
    uint64_t size_;
    uint64_t position_;
};

class MCAP_PUBLIC ICompressedReader : public IReadable {
   public:
    virtual ~ICompressedReader() override = default;

    virtual void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) = 0;
    virtual Status status() const = 0;
};

class MCAP_PUBLIC BufferReader final : public ICompressedReader {
   public:
    void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override;
    uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;
    uint64_t size() const override;
    Status status() const override;

    BufferReader() = default;
    BufferReader(const BufferReader&) = delete;
    BufferReader& operator=(const BufferReader&) = delete;
    BufferReader(BufferReader&&) = delete;
    BufferReader& operator=(BufferReader&&) = delete;

   private:
    const std::byte* data_;
    uint64_t size_;
};

#ifndef MCAP_COMPRESSION_NO_ZSTD
class MCAP_PUBLIC ZStdReader final : public ICompressedReader {
   public:
    void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override;
    uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;
    uint64_t size() const override;
    Status status() const override;

    static Status DecompressAll(const std::byte* data, uint64_t compressedSize, uint64_t uncompressedSize, ByteArray* output);
    ZStdReader() = default;
    ZStdReader(const ZStdReader&) = delete;
    ZStdReader& operator=(const ZStdReader&) = delete;
    ZStdReader(ZStdReader&&) = delete;
    ZStdReader& operator=(ZStdReader&&) = delete;

   private:
    Status status_;
    ByteArray uncompressedData_;
};
#endif

#ifndef MCAP_COMPRESSION_NO_LZ4
class MCAP_PUBLIC LZ4Reader final : public ICompressedReader {
   public:
    void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override;
    uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;
    uint64_t size() const override;
    Status status() const override;

    Status decompressAll(const std::byte* data, uint64_t size, uint64_t uncompressedSize, ByteArray* output);
    LZ4Reader();
    LZ4Reader(const LZ4Reader&) = delete;
    LZ4Reader& operator=(const LZ4Reader&) = delete;
    LZ4Reader(LZ4Reader&&) = delete;
    LZ4Reader& operator=(LZ4Reader&&) = delete;
    ~LZ4Reader() override;

   private:
    void* decompressionContext_ = nullptr;  // LZ4F_dctx*
    Status status_;
    const std::byte* compressedData_;
    ByteArray uncompressedData_;
    uint64_t compressedSize_;
    uint64_t uncompressedSize_;
};
#endif

struct LinearMessageView;

struct MCAP_PUBLIC ReadMessageOptions {
   public:
    Timestamp startTime = 0;
    Timestamp endTime = MaxTime;
    std::function<bool(std::string_view)> topicFilter;
    enum struct ReadOrder { FileOrder, LogTimeOrder, ReverseLogTimeOrder };
    ReadOrder readOrder = ReadOrder::FileOrder;

    ReadMessageOptions(Timestamp start, Timestamp end) : startTime(start), endTime(end) {}

    ReadMessageOptions() = default;

    Status validate() const;
};

class MCAP_PUBLIC McapReader final {
   public:
    ~McapReader();

    Status open(IReadable& reader);
    Status open(std::string_view filename);
    Status open(std::ifstream& stream);

    void close();

    Status readSummary(ReadSummaryMethod method, const ProblemCallback& onProblem = [](const Status&) {});

    LinearMessageView readMessages(Timestamp startTime = 0, Timestamp endTime = MaxTime);
    LinearMessageView readMessages(const ProblemCallback& onProblem, Timestamp startTime = 0, Timestamp endTime = MaxTime);

    LinearMessageView readMessages(const ProblemCallback& onProblem, const ReadMessageOptions& options);

    std::pair<ByteOffset, ByteOffset> byteRange(Timestamp startTime, Timestamp endTime = MaxTime) const;

    IReadable* dataSource();

    const std::optional<Header>& header() const;
    const std::optional<Footer>& footer() const;
    const std::optional<Statistics>& statistics() const;

    const std::unordered_map<ChannelId, ChannelPtr> channels() const;
    const std::unordered_map<SchemaId, SchemaPtr> schemas() const;

    ChannelPtr channel(ChannelId channelId) const;
    SchemaPtr schema(SchemaId schemaId) const;

    const std::vector<ChunkIndex>& chunkIndexes() const;

    const std::multimap<std::string, MetadataIndex>& metadataIndexes() const;

    // The following static methods are used internally for parsing MCAP records
    // and do not need to be called directly unless you are implementing your own
    // reader functionality or tests.

    static Status ReadRecord(IReadable& reader, uint64_t offset, Record* record);
    static Status ReadFooter(IReadable& reader, uint64_t offset, Footer* footer);

    static Status ParseHeader(const Record& record, Header* header);
    static Status ParseFooter(const Record& record, Footer* footer);
    static Status ParseSchema(const Record& record, Schema* schema);
    static Status ParseChannel(const Record& record, Channel* channel);
    static Status ParseMessage(const Record& record, Message* message);
    static Status ParseChunk(const Record& record, Chunk* chunk);
    static Status ParseMessageIndex(const Record& record, MessageIndex* messageIndex);
    static Status ParseChunkIndex(const Record& record, ChunkIndex* chunkIndex);
    static Status ParseAttachment(const Record& record, Attachment* attachment);
    static Status ParseAttachmentIndex(const Record& record, AttachmentIndex* attachmentIndex);
    static Status ParseStatistics(const Record& record, Statistics* statistics);
    static Status ParseMetadata(const Record& record, Metadata* metadata);
    static Status ParseMetadataIndex(const Record& record, MetadataIndex* metadataIndex);
    static Status ParseSummaryOffset(const Record& record, SummaryOffset* summaryOffset);
    static Status ParseDataEnd(const Record& record, DataEnd* dataEnd);

    static std::optional<Compression> ParseCompression(const std::string_view compression);

   private:
    using ChunkInterval = internal::Interval<ByteOffset, ChunkIndex>;
    friend LinearMessageView;

    IReadable* input_ = nullptr;
    std::FILE* file_ = nullptr;
    std::unique_ptr<FileReader> fileInput_;
    std::unique_ptr<FileStreamReader> fileStreamInput_;
    std::optional<Header> header_;
    std::optional<Footer> footer_;
    std::optional<Statistics> statistics_;
    std::vector<ChunkIndex> chunkIndexes_;
    internal::IntervalTree<ByteOffset, ChunkIndex> chunkRanges_;
    std::multimap<std::string, AttachmentIndex> attachmentIndexes_;
    std::multimap<std::string, MetadataIndex> metadataIndexes_;
    std::unordered_map<SchemaId, SchemaPtr> schemas_;
    std::unordered_map<ChannelId, ChannelPtr> channels_;
    ByteOffset dataStart_ = 0;
    ByteOffset dataEnd_ = EndOffset;
    Timestamp startTime_ = 0;
    Timestamp endTime_ = 0;
    bool parsedSummary_ = false;

    void reset_();
    Status readSummarySection_(IReadable& reader);
    Status readSummaryFromScan_(IReadable& reader);
};

struct MCAP_PUBLIC RecordReader {
    ByteOffset offset;
    ByteOffset endOffset;

    RecordReader(IReadable& dataSource, ByteOffset startOffset, ByteOffset endOffset = EndOffset);

    void reset(IReadable& dataSource, ByteOffset startOffset, ByteOffset endOffset);

    std::optional<Record> next();

    const Status& status() const;

    ByteOffset curRecordOffset() const;

   private:
    IReadable* dataSource_ = nullptr;
    Status status_;
    Record curRecord_;
};

struct MCAP_PUBLIC TypedChunkReader {
    std::function<void(const SchemaPtr, ByteOffset)> onSchema;
    std::function<void(const ChannelPtr, ByteOffset)> onChannel;
    std::function<void(const Message&, ByteOffset)> onMessage;
    std::function<void(const Record&, ByteOffset)> onUnknownRecord;

    TypedChunkReader();
    TypedChunkReader(const TypedChunkReader&) = delete;
    TypedChunkReader& operator=(const TypedChunkReader&) = delete;
    TypedChunkReader(TypedChunkReader&&) = delete;
    TypedChunkReader& operator=(TypedChunkReader&&) = delete;

    void reset(const Chunk& chunk, Compression compression);

    bool next();

    ByteOffset offset() const;

    const Status& status() const;

   private:
    RecordReader reader_;
    Status status_;
    BufferReader uncompressedReader_;
#ifndef MCAP_COMPRESSION_NO_LZ4
    LZ4Reader lz4Reader_;
#endif
#ifndef MCAP_COMPRESSION_NO_ZSTD
    ZStdReader zstdReader_;
#endif
};

struct MCAP_PUBLIC TypedRecordReader {
    std::function<void(const Header&, ByteOffset)> onHeader;
    std::function<void(const Footer&, ByteOffset)> onFooter;
    std::function<void(const SchemaPtr, ByteOffset, std::optional<ByteOffset>)> onSchema;
    std::function<void(const ChannelPtr, ByteOffset, std::optional<ByteOffset>)> onChannel;
    std::function<void(const Message&, ByteOffset, std::optional<ByteOffset>)> onMessage;
    std::function<void(const Chunk&, ByteOffset)> onChunk;
    std::function<void(const MessageIndex&, ByteOffset)> onMessageIndex;
    std::function<void(const ChunkIndex&, ByteOffset)> onChunkIndex;
    std::function<void(const Attachment&, ByteOffset)> onAttachment;
    std::function<void(const AttachmentIndex&, ByteOffset)> onAttachmentIndex;
    std::function<void(const Statistics&, ByteOffset)> onStatistics;
    std::function<void(const Metadata&, ByteOffset)> onMetadata;
    std::function<void(const MetadataIndex&, ByteOffset)> onMetadataIndex;
    std::function<void(const SummaryOffset&, ByteOffset)> onSummaryOffset;
    std::function<void(const DataEnd&, ByteOffset)> onDataEnd;
    std::function<void(const Record&, ByteOffset, std::optional<ByteOffset>)> onUnknownRecord;
    std::function<void(ByteOffset)> onChunkEnd;

    TypedRecordReader(IReadable& dataSource, ByteOffset startOffset, ByteOffset endOffset = EndOffset);

    TypedRecordReader(const TypedRecordReader&) = delete;
    TypedRecordReader& operator=(const TypedRecordReader&) = delete;
    TypedRecordReader(TypedRecordReader&&) = delete;
    TypedRecordReader& operator=(TypedRecordReader&&) = delete;

    bool next();

    ByteOffset offset() const;

    const Status& status() const;

   private:
    RecordReader reader_;
    TypedChunkReader chunkReader_;
    Status status_;
    bool parsingChunk_;
};

struct MCAP_PUBLIC IndexedMessageReader {
   public:
    IndexedMessageReader(McapReader& reader, const ReadMessageOptions& options, const std::function<void(const Message&, RecordOffset)> onMessage);

    bool next();

    Status status() const;

   private:
    struct ChunkSlot {
        ByteArray decompressedChunk;
        ByteOffset chunkStartOffset;
        int unreadMessages = 0;
    };
    size_t findFreeChunkSlot();
    void decompressChunk(const Chunk& chunk, ChunkSlot& slot);
    Status status_;
    McapReader& mcapReader_;
    RecordReader recordReader_;
#ifndef MCAP_COMPRESSION_NO_LZ4
    LZ4Reader lz4Reader_;
#endif
    ReadMessageOptions options_;
    std::unordered_set<ChannelId> selectedChannels_;
    std::function<void(const Message&, RecordOffset)> onMessage_;
    internal::ReadJobQueue queue_;
    std::vector<ChunkSlot> chunkSlots_;
};

struct MCAP_PUBLIC LinearMessageView {
    struct MCAP_PUBLIC Iterator {
        using iterator_category = std::input_iterator_tag;
        using difference_type = int64_t;
        using value_type = MessageView;
        using pointer = const MessageView*;
        using reference = const MessageView&;

        reference operator*() const;
        pointer operator->() const;
        Iterator& operator++();
        void operator++(int);
        MCAP_PUBLIC friend bool operator==(const Iterator& a, const Iterator& b);
        MCAP_PUBLIC friend bool operator!=(const Iterator& a, const Iterator& b);

       private:
        friend LinearMessageView;

        Iterator() = default;
        Iterator(LinearMessageView& view);

        class Impl {
           public:
            Impl(LinearMessageView& view);

            Impl(const Impl&) = delete;
            Impl& operator=(const Impl&) = delete;
            Impl(Impl&&) = delete;
            Impl& operator=(Impl&&) = delete;

            void increment();
            reference dereference() const;
            bool has_value() const;

            LinearMessageView& view_;

            std::optional<TypedRecordReader> recordReader_;
            std::optional<IndexedMessageReader> indexedMessageReader_;
            Message curMessage_;
            std::optional<MessageView> curMessageView_;

           private:
            void onMessage(const Message& message, RecordOffset offset);
        };

        bool begun_ = false;
        std::unique_ptr<Impl> impl_;
    };

    LinearMessageView(McapReader& mcapReader, const ProblemCallback& onProblem);
    LinearMessageView(
        McapReader& mcapReader, ByteOffset dataStart, ByteOffset dataEnd, Timestamp startTime, Timestamp endTime, const ProblemCallback& onProblem);
    LinearMessageView(McapReader& mcapReader, const ReadMessageOptions& options, ByteOffset dataStart, ByteOffset dataEnd, const ProblemCallback& onProblem);

    LinearMessageView(const LinearMessageView&) = delete;
    LinearMessageView& operator=(const LinearMessageView&) = delete;
    LinearMessageView(LinearMessageView&&) = default;
    LinearMessageView& operator=(LinearMessageView&&) = delete;

    Iterator begin();
    Iterator end();

   private:
    McapReader& mcapReader_;
    ByteOffset dataStart_;
    ByteOffset dataEnd_;
    ReadMessageOptions readMessageOptions_;
    const ProblemCallback onProblem_;
};

}  // namespace mcap

#ifdef MCAP_IMPLEMENTATION
    #include "reader.inl"
#endif