reader.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include "intervaltree.hpp"
4 #include "types.hpp"
5 #include <cstdio>
6 #include <fstream>
7 #include <map>
8 #include <memory>
9 #include <optional>
10 #include <string>
11 #include <unordered_map>
12 #include <vector>
13 
14 namespace mcap {
15 
16 enum struct ReadSummaryMethod {
34  ForceScan,
35 };
36 
40 struct IReadable {
41  virtual ~IReadable() = default;
42 
48  virtual uint64_t size() const = 0;
65  virtual uint64_t read(std::byte** output, uint64_t offset, uint64_t size) = 0;
66 };
67 
72 class FileReader final : public IReadable {
73 public:
74  FileReader(std::FILE* file);
75 
76  uint64_t size() const override;
77  uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;
78 
79 private:
80  std::FILE* file_;
81  std::vector<std::byte> buffer_;
82  uint64_t size_;
83  uint64_t position_;
84 };
85 
89 class FileStreamReader final : public IReadable {
90 public:
91  FileStreamReader(std::ifstream& stream);
92 
93  uint64_t size() const override;
94  uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;
95 
96 private:
97  std::ifstream& stream_;
98  std::vector<std::byte> buffer_;
99  uint64_t size_;
100  uint64_t position_;
101 };
102 
106 class ICompressedReader : public IReadable {
107 public:
108  virtual ~ICompressedReader() = default;
109 
119  virtual void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) = 0;
125  virtual Status status() const = 0;
126 };
127 
132 class BufferReader final : public ICompressedReader {
133 public:
134  void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override;
135  uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;
136  uint64_t size() const override;
137  Status status() const override;
138 
139  BufferReader() = default;
140  BufferReader(const BufferReader&) = delete;
141  BufferReader& operator=(const BufferReader&) = delete;
142  BufferReader(BufferReader&&) = delete;
143  BufferReader& operator=(BufferReader&&) = delete;
144 
145 private:
146  const std::byte* data_;
147  uint64_t size_;
148 };
149 
154 class ZStdReader final : public ICompressedReader {
155 public:
156  void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override;
157  uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;
158  uint64_t size() const override;
159  Status status() const override;
160 
161  ZStdReader() = default;
162  ZStdReader(const ZStdReader&) = delete;
163  ZStdReader& operator=(const ZStdReader&) = delete;
164  ZStdReader(ZStdReader&&) = delete;
165  ZStdReader& operator=(ZStdReader&&) = delete;
166 
167 private:
169  const std::byte* compressedData_;
171  uint64_t compressedSize_;
173 };
174 
179 class LZ4Reader final : public ICompressedReader {
180 public:
181  void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override;
182  uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override;
183  uint64_t size() const override;
184  Status status() const override;
185 
186  LZ4Reader();
187  LZ4Reader(const LZ4Reader&) = delete;
188  LZ4Reader& operator=(const LZ4Reader&) = delete;
189  LZ4Reader(LZ4Reader&&) = delete;
190  LZ4Reader& operator=(LZ4Reader&&) = delete;
191  ~LZ4Reader();
192 
193 private:
194  void* decompressionContext_ = nullptr; // LZ4F_dctx*
196  const std::byte* compressedData_;
198  uint64_t compressedSize_;
200 };
201 
202 struct LinearMessageView;
203 
207 class McapReader final {
208 public:
209  ~McapReader();
210 
221  Status open(IReadable& reader);
230  Status open(std::string_view filename);
240  Status open(std::ifstream& stream);
241 
247  void close();
248 
256  Status readSummary(
257  ReadSummaryMethod method, const ProblemCallback& onProblem = [](const Status&) {});
258 
270  LinearMessageView readMessages(Timestamp startTime = 0, Timestamp endTime = MaxTime);
285  LinearMessageView readMessages(const ProblemCallback& onProblem, Timestamp startTime = 0,
286  Timestamp endTime = MaxTime);
287 
303  std::pair<ByteOffset, ByteOffset> byteRange(Timestamp startTime,
304  Timestamp endTime = MaxTime) const;
305 
310  IReadable* dataSource();
311 
315  const std::optional<Header>& header() const;
319  const std::optional<Footer>& footer() const;
323  const std::optional<Statistics>& statistics() const;
324 
329  const std::unordered_map<ChannelId, ChannelPtr> channels() const;
334  const std::unordered_map<SchemaId, SchemaPtr> schemas() const;
335 
343  ChannelPtr channel(ChannelId channelId) const;
351  SchemaPtr schema(SchemaId schemaId) const;
352 
357  const std::vector<ChunkIndex>& chunkIndexes() const;
358 
359  // The following static methods are used internally for parsing MCAP records
360  // and do not need to be called directly unless you are implementing your own
361  // reader functionality or tests.
362 
363  static Status ReadRecord(IReadable& reader, uint64_t offset, Record* record);
364  static Status ReadFooter(IReadable& reader, uint64_t offset, Footer* footer);
365 
366  static Status ParseHeader(const Record& record, Header* header);
367  static Status ParseFooter(const Record& record, Footer* footer);
368  static Status ParseSchema(const Record& record, Schema* schema);
369  static Status ParseChannel(const Record& record, Channel* channel);
370  static Status ParseMessage(const Record& record, Message* message);
371  static Status ParseChunk(const Record& record, Chunk* chunk);
372  static Status ParseMessageIndex(const Record& record, MessageIndex* messageIndex);
373  static Status ParseChunkIndex(const Record& record, ChunkIndex* chunkIndex);
374  static Status ParseAttachment(const Record& record, Attachment* attachment);
375  static Status ParseAttachmentIndex(const Record& record, AttachmentIndex* attachmentIndex);
376  static Status ParseStatistics(const Record& record, Statistics* statistics);
377  static Status ParseMetadata(const Record& record, Metadata* metadata);
378  static Status ParseMetadataIndex(const Record& record, MetadataIndex* metadataIndex);
379  static Status ParseSummaryOffset(const Record& record, SummaryOffset* summaryOffset);
380  static Status ParseDataEnd(const Record& record, DataEnd* dataEnd);
381 
385  static std::optional<Compression> ParseCompression(const std::string_view compression);
386 
387 private:
390 
391  IReadable* input_ = nullptr;
392  std::FILE* file_ = nullptr;
393  std::unique_ptr<FileReader> fileInput_;
394  std::unique_ptr<FileStreamReader> fileStreamInput_;
395  std::optional<Header> header_;
396  std::optional<Footer> footer_;
397  std::optional<Statistics> statistics_;
398  std::vector<ChunkIndex> chunkIndexes_;
400  std::multimap<std::string, AttachmentIndex> attachmentIndexes_;
401  std::multimap<std::string, MetadataIndex> metadataIndexes_;
402  std::unordered_map<SchemaId, SchemaPtr> schemas_;
403  std::unordered_map<ChannelId, ChannelPtr> channels_;
404  // Used for uncompressed messages
405  std::unordered_map<ChannelId, std::map<Timestamp, ByteOffset>> messageIndex_;
406  ByteOffset dataStart_ = 0;
407  ByteOffset dataEnd_ = EndOffset;
408  Timestamp startTime_ = 0;
409  Timestamp endTime_ = 0;
410  bool parsedSummary_ = false;
411 
412  void reset_();
413  Status readSummarySection_(IReadable& reader);
414  Status readSummaryFromScan_(IReadable& reader);
415 };
416 
421 struct RecordReader {
424 
425  RecordReader(IReadable& dataSource, ByteOffset startOffset, ByteOffset endOffset = EndOffset);
426 
427  void reset(IReadable& dataSource, ByteOffset startOffset, ByteOffset endOffset);
428 
429  std::optional<Record> next();
430 
431  const Status& status() const;
432 
433  ByteOffset curRecordOffset() const;
434 
435 private:
436  IReadable* dataSource_ = nullptr;
439 };
440 
442  std::function<void(const SchemaPtr, ByteOffset)> onSchema;
443  std::function<void(const ChannelPtr, ByteOffset)> onChannel;
444  std::function<void(const Message&, ByteOffset)> onMessage;
445  std::function<void(const Record&, ByteOffset)> onUnknownRecord;
446 
448  TypedChunkReader(const TypedChunkReader&) = delete;
449  TypedChunkReader& operator=(const TypedChunkReader&) = delete;
451  TypedChunkReader& operator=(TypedChunkReader&&) = delete;
452 
453  void reset(const Chunk& chunk, Compression compression);
454 
455  bool next();
456 
457  ByteOffset offset() const;
458 
459  const Status& status() const;
460 
461 private:
467 };
468 
474  std::function<void(const Header&, ByteOffset)> onHeader;
475  std::function<void(const Footer&, ByteOffset)> onFooter;
476  std::function<void(const SchemaPtr, ByteOffset, std::optional<ByteOffset>)> onSchema;
477  std::function<void(const ChannelPtr, ByteOffset, std::optional<ByteOffset>)> onChannel;
478  std::function<void(const Message&, ByteOffset, std::optional<ByteOffset>)> onMessage;
479  std::function<void(const Chunk&, ByteOffset)> onChunk;
480  std::function<void(const MessageIndex&, ByteOffset)> onMessageIndex;
481  std::function<void(const ChunkIndex&, ByteOffset)> onChunkIndex;
482  std::function<void(const Attachment&, ByteOffset)> onAttachment;
483  std::function<void(const AttachmentIndex&, ByteOffset)> onAttachmentIndex;
484  std::function<void(const Statistics&, ByteOffset)> onStatistics;
485  std::function<void(const Metadata&, ByteOffset)> onMetadata;
486  std::function<void(const MetadataIndex&, ByteOffset)> onMetadataIndex;
487  std::function<void(const SummaryOffset&, ByteOffset)> onSummaryOffset;
488  std::function<void(const DataEnd&, ByteOffset)> onDataEnd;
489  std::function<void(const Record&, ByteOffset, std::optional<ByteOffset>)> onUnknownRecord;
490  std::function<void(ByteOffset)> onChunkEnd;
491 
492  TypedRecordReader(IReadable& dataSource, ByteOffset startOffset,
493  ByteOffset endOffset = EndOffset);
494 
495  TypedRecordReader(const TypedRecordReader&) = delete;
496  TypedRecordReader& operator=(const TypedRecordReader&) = delete;
498  TypedRecordReader& operator=(TypedRecordReader&&) = delete;
499 
500  bool next();
501 
502  ByteOffset offset() const;
503 
504  const Status& status() const;
505 
506 private:
511 };
512 
517  struct Iterator {
518  using iterator_category = std::input_iterator_tag;
519  using difference_type = int64_t;
521  using pointer = const MessageView*;
522  using reference = const MessageView&;
523 
524  reference operator*() const;
525  pointer operator->() const;
526  Iterator& operator++();
527  void operator++(int);
528  friend bool operator==(const Iterator& a, const Iterator& b);
529  friend bool operator!=(const Iterator& a, const Iterator& b);
530 
531  private:
533 
534  Iterator() = default;
535  Iterator(McapReader& mcapReader, ByteOffset dataStart, ByteOffset dataEnd, Timestamp startTime,
536  Timestamp endTime, const ProblemCallback& onProblem);
537 
538  class Impl {
539  public:
540  Impl(McapReader& mcapReader, ByteOffset dataStart, ByteOffset dataEnd, Timestamp startTime,
541  Timestamp endTime, const ProblemCallback& onProblem);
542 
543  Impl(const Impl&) = delete;
544  Impl& operator=(const Impl&) = delete;
545  Impl(Impl&&) = delete;
546  Impl& operator=(Impl&&) = delete;
547 
548  void increment();
549  reference dereference() const;
550  bool has_value() const;
551 
553  std::optional<TypedRecordReader> recordReader_;
558  std::optional<MessageView> curMessageView_;
559  };
560 
561  std::unique_ptr<Impl> impl_;
562  };
563 
564  LinearMessageView(McapReader& mcapReader, const ProblemCallback& onProblem);
565  LinearMessageView(McapReader& mcapReader, ByteOffset dataStart, ByteOffset dataEnd,
566  Timestamp startTime, Timestamp endTime, const ProblemCallback& onProblem);
567 
568  LinearMessageView(const LinearMessageView&) = delete;
569  LinearMessageView& operator=(const LinearMessageView&) = delete;
571  LinearMessageView& operator=(LinearMessageView&&) = delete;
572 
573  Iterator begin();
574  Iterator end();
575 
576 private:
583 };
584 
585 } // namespace mcap
586 
587 #ifdef MCAP_IMPLEMENTATION
588 #include "reader.inl"
589 #endif
std::FILE * file_
Definition: reader.hpp:80
ZStdReader zstdReader_
Definition: reader.hpp:466
Compression
Supported MCAP compression algorithms.
Definition: types.hpp:34
const std::byte * compressedData_
Definition: reader.hpp:169
An Attachment is an arbitrary file embedded in an MCAP file, including a name, content-type, timestamps, and optional CRC. Attachment records are written in the Data section, outside of Chunks.
Definition: types.hpp:252
std::function< void(const ChannelPtr, ByteOffset)> onChannel
Definition: reader.hpp:443
std::function< void(const Status &)> ProblemCallback
Definition: types.hpp:22
LZ4Reader lz4Reader_
Definition: reader.hpp:465
Parse the Summary section to produce seeking indexes and summary statistics. If the Summary section i...
uint16_t SchemaId
Definition: types.hpp:16
std::function< void(const SummaryOffset &, ByteOffset)> onSummaryOffset
Definition: reader.hpp:487
const std::byte * compressedData_
Definition: reader.hpp:196
uint64_t uncompressedSize_
Definition: reader.hpp:172
The Statistics record is found in the Summary section, providing counts and timestamp ranges for the ...
Definition: types.hpp:296
constexpr ByteOffset EndOffset
Definition: types.hpp:28
Wraps a status code and string message carrying additional context.
Definition: errors.hpp:33
Describes a Channel that messages are written to. A Channel represents a single connection from a pub...
Definition: types.hpp:155
std::function< void(const Footer &, ByteOffset)> onFooter
Definition: reader.hpp:475
constexpr bool operator!=(const optional< T > &lhs, const optional< U > &rhs)
relop
Definition: sol.hpp:6020
An iterable view of Messages in an MCAP file.
Definition: reader.hpp:516
uint16_t ChannelId
Definition: types.hpp:17
TypedChunkReader chunkReader_
Definition: reader.hpp:508
std::function< void(const Statistics &, ByteOffset)> onStatistics
Definition: reader.hpp:484
std::input_iterator_tag iterator_category
Definition: reader.hpp:518
RecordReader reader_
Definition: reader.hpp:462
std::function< void(const SchemaPtr, ByteOffset)> onSchema
Definition: reader.hpp:442
constexpr Timestamp MaxTime
Definition: types.hpp:29
ByteArray uncompressedData_
Definition: reader.hpp:170
friend LinearMessageView
Definition: reader.hpp:389
ICompressedReader implementation that decompresses LZ4 (https://lz4.github.io/lz4/) data...
Definition: reader.hpp:179
IReadable implementation wrapping a std::ifstream input file stream.
Definition: reader.hpp:89
std::function< void(const Metadata &, ByteOffset)> onMetadata
Definition: reader.hpp:485
uint64_t Timestamp
Definition: types.hpp:18
std::function< void(const DataEnd &, ByteOffset)> onDataEnd
Definition: reader.hpp:488
std::multimap< std::string, MetadataIndex > metadataIndexes_
Definition: reader.hpp:401
std::function< void(const Record &, ByteOffset)> onUnknownRecord
Definition: reader.hpp:445
uint64_t size_
Definition: reader.hpp:82
fp operator*(fp x, fp y)
Definition: format-inl.h:275
std::function< void(const Chunk &, ByteOffset)> onChunk
Definition: reader.hpp:479
An collection of Schemas, Channels, and Messages that supports compression and indexing.
Definition: types.hpp:211
basic_string_view< char > string_view
Definition: core.h:522
uint64_t size_
Definition: reader.hpp:147
std::optional< Footer > footer_
Definition: reader.hpp:396
Read the file sequentially from Header to DataEnd to produce seeking indexes and summary statistics...
Chunk Index records are found in the Summary section, providing summary information for a single Chun...
Definition: types.hpp:235
std::unordered_map< SchemaId, SchemaPtr > schemas_
Definition: reader.hpp:402
std::vector< std::byte > buffer_
Definition: reader.hpp:81
ByteOffset dataStart_
Definition: reader.hpp:578
Metdata Index records are found in the Summary section, providing summary information for a single Me...
Definition: types.hpp:321
std::function< void(const MessageIndex &, ByteOffset)> onMessageIndex
Definition: reader.hpp:480
Attachment Index records are found in the Summary section, providing summary information for a single...
Definition: types.hpp:266
Returned when iterating over Messages in a file, MessageView contains a reference to one Message...
Definition: types.hpp:356
std::ifstream & stream_
Definition: reader.hpp:97
std::unique_ptr< FileStreamReader > fileStreamInput_
Definition: reader.hpp:394
The final record in an MCAP file (before the trailing magic byte sequence). Contains byte offsets fro...
Definition: types.hpp:111
ReadSummaryMethod
Definition: reader.hpp:16
uint64_t compressedSize_
Definition: reader.hpp:171
std::multimap< std::string, AttachmentIndex > attachmentIndexes_
Definition: reader.hpp:400
std::unordered_map< ChannelId, ChannelPtr > channels_
Definition: reader.hpp:403
std::function< void(const Attachment &, ByteOffset)> onAttachment
Definition: reader.hpp:482
std::vector< std::byte > buffer_
Definition: reader.hpp:98
A low-level interface for parsing MCAP-style TLV records from a data source.
Definition: reader.hpp:421
std::function< void(ByteOffset)> onChunkEnd
Definition: reader.hpp:490
If the Summary section is missing or incomplete, allow falling back to reading the file sequentially ...
uint64_t uncompressedSize_
Definition: reader.hpp:199
McapReader & mcapReader_
Definition: reader.hpp:577
std::shared_ptr< Schema > SchemaPtr
Definition: types.hpp:172
#define next(ls)
Definition: llex.c:32
std::function< void(const Message &, ByteOffset)> onMessage
Definition: reader.hpp:444
uint64_t ByteOffset
Definition: types.hpp:19
static const char * reader(lua_State *L, void *ud, size_t *size)
Definition: luac.c:126
Status status_
Definition: reader.hpp:168
std::unordered_map< ChannelId, std::map< Timestamp, ByteOffset > > messageIndex_
Definition: reader.hpp:405
Summary Offset records are found in the Summary Offset section. Records in the Summary section are gr...
Definition: types.hpp:336
const ProblemCallback & onProblem_
Definition: reader.hpp:556
std::shared_ptr< Channel > ChannelPtr
Definition: types.hpp:173
Status status_
Definition: reader.hpp:195
The final record in the Data section, signaling the end of Data and beginning of Summary. Optionally contains a CRC of the entire Data section.
Definition: types.hpp:346
const ProblemCallback onProblem_
Definition: reader.hpp:582
An abstract interface for reading MCAP data.
Definition: reader.hpp:40
std::vector< ChunkIndex > chunkIndexes_
Definition: reader.hpp:398
Holds a named map of key/value strings containing arbitrary user data. Metadata records are found in ...
Definition: types.hpp:312
An abstract interface for compressed readers.
Definition: reader.hpp:106
std::unique_ptr< Impl > impl_
Definition: reader.hpp:561
const std::byte * data_
Definition: reader.hpp:146
uint64_t compressedSize_
Definition: reader.hpp:198
std::optional< MessageView > curMessageView_
Definition: reader.hpp:558
bool operator==(QwtEventPattern::MousePattern b1, QwtEventPattern::MousePattern b2)
Compare operator.
A "null" compressed reader that directly passes through uncompressed data. No internal buffers are al...
Definition: reader.hpp:132
uint64_t position_
Definition: reader.hpp:83
RecordReader reader_
Definition: reader.hpp:507
IReadable implementation wrapping a FILE* pointer created by fopen() and a read buffer.
Definition: reader.hpp:72
std::function< void(const AttachmentIndex &, ByteOffset)> onAttachmentIndex
Definition: reader.hpp:483
A single Message published to a Channel.
Definition: types.hpp:178
internal::IntervalTree< ByteOffset, ChunkIndex > chunkRanges_
Definition: reader.hpp:399
std::optional< Statistics > statistics_
Definition: reader.hpp:397
std::optional< TypedRecordReader > recordReader_
Definition: reader.hpp:553
Appears at the beginning of every MCAP file (after the magic byte sequence) and contains the recordin...
Definition: types.hpp:99
std::function< void(const Header &, ByteOffset)> onHeader
Definition: reader.hpp:474
ByteOffset offset
Definition: reader.hpp:422
std::function< void(const ChunkIndex &, ByteOffset)> onChunkIndex
Definition: reader.hpp:481
span_constexpr std::size_t size(span< T, Extent > const &spn)
Definition: span.hpp:1485
const std::string header
ByteOffset endOffset
Definition: reader.hpp:423
static const char * output
Definition: luac.c:38
ByteArray uncompressedData_
Definition: reader.hpp:197
Describes a schema used for message encoding and decoding and/or describing the shape of messages...
Definition: types.hpp:128
std::optional< Header > header_
Definition: reader.hpp:395
A list of timestamps to byte offsets for a single Channel. This record appears after each Chunk...
Definition: types.hpp:225
BufferReader uncompressedReader_
Definition: reader.hpp:464
std::vector< std::byte > ByteArray
Definition: types.hpp:21
A mid-level interface for parsing and validating MCAP records from a data source. ...
Definition: reader.hpp:473
Definition: format.h:895
ICompressedReader implementation that decompresses Zstandard (https://facebook.github.io/zstd/) data.
Definition: reader.hpp:154
std::unique_ptr< FileReader > fileInput_
Definition: reader.hpp:393
A generic Type-Length-Value record using a uint8 type and uint64 length. This is the generic form of ...
Definition: types.hpp:83
std::function< void(const MetadataIndex &, ByteOffset)> onMetadataIndex
Definition: reader.hpp:486
Provides a read interface to an MCAP file.
Definition: reader.hpp:207


plotjuggler
Author(s): Davide Faconti
autogenerated on Mon Jun 19 2023 03:01:39