Program Listing for File reader.hpp
↰ Return to documentation for file (include/3rdparty/mcap/reader.hpp)
#pragma once
#include <cstdio>
#include <fstream>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "intervaltree.hpp"
#include "read_job_queue.hpp"
#include "types.hpp"
#include "visibility.hpp"
namespace mcap {
enum struct ReadSummaryMethod {
NoFallbackScan,
AllowFallbackScan,
ForceScan,
};
struct MCAP_PUBLIC IReadable {
virtual ~IReadable() = default;
virtual uint64_t size() const = 0;
virtual uint64_t read(std::byte** output, uint64_t offset, uint64_t size) = 0;
};
class MCAP_PUBLIC FileReader final : public IReadable {
public:
FileReader(std::FILE* file);
uint64_t size() const override;
uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;
private:
std::FILE* file_;
std::vector<std::byte> buffer_;
uint64_t size_;
uint64_t position_;
};
class MCAP_PUBLIC FileStreamReader final : public IReadable {
public:
FileStreamReader(std::ifstream& stream);
uint64_t size() const override;
uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;
private:
std::ifstream& stream_;
std::vector<std::byte> buffer_;
uint64_t size_;
uint64_t position_;
};
class MCAP_PUBLIC ICompressedReader : public IReadable {
public:
virtual ~ICompressedReader() override = default;
virtual void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) = 0;
virtual Status status() const = 0;
};
class MCAP_PUBLIC BufferReader final : public ICompressedReader {
public:
void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override;
uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;
uint64_t size() const override;
Status status() const override;
BufferReader() = default;
BufferReader(const BufferReader&) = delete;
BufferReader& operator=(const BufferReader&) = delete;
BufferReader(BufferReader&&) = delete;
BufferReader& operator=(BufferReader&&) = delete;
private:
const std::byte* data_;
uint64_t size_;
};
#ifndef MCAP_COMPRESSION_NO_ZSTD
class MCAP_PUBLIC ZStdReader final : public ICompressedReader {
public:
void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override;
uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;
uint64_t size() const override;
Status status() const override;
static Status DecompressAll(const std::byte* data, uint64_t compressedSize, uint64_t uncompressedSize, ByteArray* output);
ZStdReader() = default;
ZStdReader(const ZStdReader&) = delete;
ZStdReader& operator=(const ZStdReader&) = delete;
ZStdReader(ZStdReader&&) = delete;
ZStdReader& operator=(ZStdReader&&) = delete;
private:
Status status_;
ByteArray uncompressedData_;
};
#endif
#ifndef MCAP_COMPRESSION_NO_LZ4
class MCAP_PUBLIC LZ4Reader final : public ICompressedReader {
public:
void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override;
uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;
uint64_t size() const override;
Status status() const override;
Status decompressAll(const std::byte* data, uint64_t size, uint64_t uncompressedSize, ByteArray* output);
LZ4Reader();
LZ4Reader(const LZ4Reader&) = delete;
LZ4Reader& operator=(const LZ4Reader&) = delete;
LZ4Reader(LZ4Reader&&) = delete;
LZ4Reader& operator=(LZ4Reader&&) = delete;
~LZ4Reader() override;
private:
void* decompressionContext_ = nullptr; // LZ4F_dctx*
Status status_;
const std::byte* compressedData_;
ByteArray uncompressedData_;
uint64_t compressedSize_;
uint64_t uncompressedSize_;
};
#endif
struct LinearMessageView;
struct MCAP_PUBLIC ReadMessageOptions {
public:
Timestamp startTime = 0;
Timestamp endTime = MaxTime;
std::function<bool(std::string_view)> topicFilter;
enum struct ReadOrder { FileOrder, LogTimeOrder, ReverseLogTimeOrder };
ReadOrder readOrder = ReadOrder::FileOrder;
ReadMessageOptions(Timestamp start, Timestamp end) : startTime(start), endTime(end) {}
ReadMessageOptions() = default;
Status validate() const;
};
class MCAP_PUBLIC McapReader final {
public:
~McapReader();
Status open(IReadable& reader);
Status open(std::string_view filename);
Status open(std::ifstream& stream);
void close();
Status readSummary(ReadSummaryMethod method, const ProblemCallback& onProblem = [](const Status&) {});
LinearMessageView readMessages(Timestamp startTime = 0, Timestamp endTime = MaxTime);
LinearMessageView readMessages(const ProblemCallback& onProblem, Timestamp startTime = 0, Timestamp endTime = MaxTime);
LinearMessageView readMessages(const ProblemCallback& onProblem, const ReadMessageOptions& options);
std::pair<ByteOffset, ByteOffset> byteRange(Timestamp startTime, Timestamp endTime = MaxTime) const;
IReadable* dataSource();
const std::optional<Header>& header() const;
const std::optional<Footer>& footer() const;
const std::optional<Statistics>& statistics() const;
const std::unordered_map<ChannelId, ChannelPtr> channels() const;
const std::unordered_map<SchemaId, SchemaPtr> schemas() const;
ChannelPtr channel(ChannelId channelId) const;
SchemaPtr schema(SchemaId schemaId) const;
const std::vector<ChunkIndex>& chunkIndexes() const;
const std::multimap<std::string, MetadataIndex>& metadataIndexes() const;
// The following static methods are used internally for parsing MCAP records
// and do not need to be called directly unless you are implementing your own
// reader functionality or tests.
static Status ReadRecord(IReadable& reader, uint64_t offset, Record* record);
static Status ReadFooter(IReadable& reader, uint64_t offset, Footer* footer);
static Status ParseHeader(const Record& record, Header* header);
static Status ParseFooter(const Record& record, Footer* footer);
static Status ParseSchema(const Record& record, Schema* schema);
static Status ParseChannel(const Record& record, Channel* channel);
static Status ParseMessage(const Record& record, Message* message);
static Status ParseChunk(const Record& record, Chunk* chunk);
static Status ParseMessageIndex(const Record& record, MessageIndex* messageIndex);
static Status ParseChunkIndex(const Record& record, ChunkIndex* chunkIndex);
static Status ParseAttachment(const Record& record, Attachment* attachment);
static Status ParseAttachmentIndex(const Record& record, AttachmentIndex* attachmentIndex);
static Status ParseStatistics(const Record& record, Statistics* statistics);
static Status ParseMetadata(const Record& record, Metadata* metadata);
static Status ParseMetadataIndex(const Record& record, MetadataIndex* metadataIndex);
static Status ParseSummaryOffset(const Record& record, SummaryOffset* summaryOffset);
static Status ParseDataEnd(const Record& record, DataEnd* dataEnd);
static std::optional<Compression> ParseCompression(const std::string_view compression);
private:
using ChunkInterval = internal::Interval<ByteOffset, ChunkIndex>;
friend LinearMessageView;
IReadable* input_ = nullptr;
std::FILE* file_ = nullptr;
std::unique_ptr<FileReader> fileInput_;
std::unique_ptr<FileStreamReader> fileStreamInput_;
std::optional<Header> header_;
std::optional<Footer> footer_;
std::optional<Statistics> statistics_;
std::vector<ChunkIndex> chunkIndexes_;
internal::IntervalTree<ByteOffset, ChunkIndex> chunkRanges_;
std::multimap<std::string, AttachmentIndex> attachmentIndexes_;
std::multimap<std::string, MetadataIndex> metadataIndexes_;
std::unordered_map<SchemaId, SchemaPtr> schemas_;
std::unordered_map<ChannelId, ChannelPtr> channels_;
ByteOffset dataStart_ = 0;
ByteOffset dataEnd_ = EndOffset;
Timestamp startTime_ = 0;
Timestamp endTime_ = 0;
bool parsedSummary_ = false;
void reset_();
Status readSummarySection_(IReadable& reader);
Status readSummaryFromScan_(IReadable& reader);
};
struct MCAP_PUBLIC RecordReader {
ByteOffset offset;
ByteOffset endOffset;
RecordReader(IReadable& dataSource, ByteOffset startOffset, ByteOffset endOffset = EndOffset);
void reset(IReadable& dataSource, ByteOffset startOffset, ByteOffset endOffset);
std::optional<Record> next();
const Status& status() const;
ByteOffset curRecordOffset() const;
private:
IReadable* dataSource_ = nullptr;
Status status_;
Record curRecord_;
};
struct MCAP_PUBLIC TypedChunkReader {
std::function<void(const SchemaPtr, ByteOffset)> onSchema;
std::function<void(const ChannelPtr, ByteOffset)> onChannel;
std::function<void(const Message&, ByteOffset)> onMessage;
std::function<void(const Record&, ByteOffset)> onUnknownRecord;
TypedChunkReader();
TypedChunkReader(const TypedChunkReader&) = delete;
TypedChunkReader& operator=(const TypedChunkReader&) = delete;
TypedChunkReader(TypedChunkReader&&) = delete;
TypedChunkReader& operator=(TypedChunkReader&&) = delete;
void reset(const Chunk& chunk, Compression compression);
bool next();
ByteOffset offset() const;
const Status& status() const;
private:
RecordReader reader_;
Status status_;
BufferReader uncompressedReader_;
#ifndef MCAP_COMPRESSION_NO_LZ4
LZ4Reader lz4Reader_;
#endif
#ifndef MCAP_COMPRESSION_NO_ZSTD
ZStdReader zstdReader_;
#endif
};
struct MCAP_PUBLIC TypedRecordReader {
std::function<void(const Header&, ByteOffset)> onHeader;
std::function<void(const Footer&, ByteOffset)> onFooter;
std::function<void(const SchemaPtr, ByteOffset, std::optional<ByteOffset>)> onSchema;
std::function<void(const ChannelPtr, ByteOffset, std::optional<ByteOffset>)> onChannel;
std::function<void(const Message&, ByteOffset, std::optional<ByteOffset>)> onMessage;
std::function<void(const Chunk&, ByteOffset)> onChunk;
std::function<void(const MessageIndex&, ByteOffset)> onMessageIndex;
std::function<void(const ChunkIndex&, ByteOffset)> onChunkIndex;
std::function<void(const Attachment&, ByteOffset)> onAttachment;
std::function<void(const AttachmentIndex&, ByteOffset)> onAttachmentIndex;
std::function<void(const Statistics&, ByteOffset)> onStatistics;
std::function<void(const Metadata&, ByteOffset)> onMetadata;
std::function<void(const MetadataIndex&, ByteOffset)> onMetadataIndex;
std::function<void(const SummaryOffset&, ByteOffset)> onSummaryOffset;
std::function<void(const DataEnd&, ByteOffset)> onDataEnd;
std::function<void(const Record&, ByteOffset, std::optional<ByteOffset>)> onUnknownRecord;
std::function<void(ByteOffset)> onChunkEnd;
TypedRecordReader(IReadable& dataSource, ByteOffset startOffset, ByteOffset endOffset = EndOffset);
TypedRecordReader(const TypedRecordReader&) = delete;
TypedRecordReader& operator=(const TypedRecordReader&) = delete;
TypedRecordReader(TypedRecordReader&&) = delete;
TypedRecordReader& operator=(TypedRecordReader&&) = delete;
bool next();
ByteOffset offset() const;
const Status& status() const;
private:
RecordReader reader_;
TypedChunkReader chunkReader_;
Status status_;
bool parsingChunk_;
};
struct MCAP_PUBLIC IndexedMessageReader {
public:
IndexedMessageReader(McapReader& reader, const ReadMessageOptions& options, const std::function<void(const Message&, RecordOffset)> onMessage);
bool next();
Status status() const;
private:
struct ChunkSlot {
ByteArray decompressedChunk;
ByteOffset chunkStartOffset;
int unreadMessages = 0;
};
size_t findFreeChunkSlot();
void decompressChunk(const Chunk& chunk, ChunkSlot& slot);
Status status_;
McapReader& mcapReader_;
RecordReader recordReader_;
#ifndef MCAP_COMPRESSION_NO_LZ4
LZ4Reader lz4Reader_;
#endif
ReadMessageOptions options_;
std::unordered_set<ChannelId> selectedChannels_;
std::function<void(const Message&, RecordOffset)> onMessage_;
internal::ReadJobQueue queue_;
std::vector<ChunkSlot> chunkSlots_;
};
struct MCAP_PUBLIC LinearMessageView {
struct MCAP_PUBLIC Iterator {
using iterator_category = std::input_iterator_tag;
using difference_type = int64_t;
using value_type = MessageView;
using pointer = const MessageView*;
using reference = const MessageView&;
reference operator*() const;
pointer operator->() const;
Iterator& operator++();
void operator++(int);
MCAP_PUBLIC friend bool operator==(const Iterator& a, const Iterator& b);
MCAP_PUBLIC friend bool operator!=(const Iterator& a, const Iterator& b);
private:
friend LinearMessageView;
Iterator() = default;
Iterator(LinearMessageView& view);
class Impl {
public:
Impl(LinearMessageView& view);
Impl(const Impl&) = delete;
Impl& operator=(const Impl&) = delete;
Impl(Impl&&) = delete;
Impl& operator=(Impl&&) = delete;
void increment();
reference dereference() const;
bool has_value() const;
LinearMessageView& view_;
std::optional<TypedRecordReader> recordReader_;
std::optional<IndexedMessageReader> indexedMessageReader_;
Message curMessage_;
std::optional<MessageView> curMessageView_;
private:
void onMessage(const Message& message, RecordOffset offset);
};
bool begun_ = false;
std::unique_ptr<Impl> impl_;
};
LinearMessageView(McapReader& mcapReader, const ProblemCallback& onProblem);
LinearMessageView(
McapReader& mcapReader, ByteOffset dataStart, ByteOffset dataEnd, Timestamp startTime, Timestamp endTime, const ProblemCallback& onProblem);
LinearMessageView(McapReader& mcapReader, const ReadMessageOptions& options, ByteOffset dataStart, ByteOffset dataEnd, const ProblemCallback& onProblem);
LinearMessageView(const LinearMessageView&) = delete;
LinearMessageView& operator=(const LinearMessageView&) = delete;
LinearMessageView(LinearMessageView&&) = default;
LinearMessageView& operator=(LinearMessageView&&) = delete;
Iterator begin();
Iterator end();
private:
McapReader& mcapReader_;
ByteOffset dataStart_;
ByteOffset dataEnd_;
ReadMessageOptions readMessageOptions_;
const ProblemCallback onProblem_;
};
} // namespace mcap
#ifdef MCAP_IMPLEMENTATION
#include "reader.inl"
#endif