read_job_queue.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include "types.hpp"
4 #include <algorithm>
5 #include <variant>
6 
7 namespace mcap::internal {
8 
9 // Helper for writing compile-time exhaustive variant visitors.
10 template <class>
11 inline constexpr bool always_false_v = false;
12 
21 };
22 
33 };
34 
38 using ReadJob = std::variant<ReadMessageJob, DecompressChunkJob>;
39 
43 struct ReadJobQueue {
44 private:
45  bool reverse_ = false;
46  std::vector<ReadJob> heap_;
47 
51  static Timestamp TimeComparisonKey(const ReadJob& job, bool reverse) {
52  Timestamp result = 0;
53  std::visit(
54  [&](auto&& arg) {
55  using T = std::decay_t<decltype(arg)>;
56  if constexpr (std::is_same_v<T, ReadMessageJob>) {
57  result = arg.timestamp;
58  } else if constexpr (std::is_same_v<T, DecompressChunkJob>) {
59  if (reverse) {
60  result = arg.messageEndTime;
61  } else {
62  result = arg.messageStartTime;
63  }
64  } else {
65  static_assert(always_false_v<T>, "non-exhaustive visitor!");
66  }
67  },
68  job);
69  return result;
70  }
71  static RecordOffset PositionComparisonKey(const ReadJob& job, bool reverse) {
72  RecordOffset result;
73  std::visit(
74  [&](auto&& arg) {
75  using T = std::decay_t<decltype(arg)>;
76  if constexpr (std::is_same_v<T, ReadMessageJob>) {
77  result = arg.offset;
78  } else if constexpr (std::is_same_v<T, DecompressChunkJob>) {
79  if (reverse) {
80  result.offset = arg.messageIndexEndOffset;
81  } else {
82  result.offset = arg.chunkStartOffset;
83  }
84  } else {
85  static_assert(always_false_v<T>, "non-exhaustive visitor!");
86  }
87  },
88  job);
89  return result;
90  }
91 
92  static bool CompareForward(const ReadJob& a, const ReadJob& b) {
93  auto aTimestamp = TimeComparisonKey(a, false);
94  auto bTimestamp = TimeComparisonKey(b, false);
95  if (aTimestamp == bTimestamp) {
96  return PositionComparisonKey(a, false) > PositionComparisonKey(b, false);
97  }
98  return aTimestamp > bTimestamp;
99  }
100 
101  static bool CompareReverse(const ReadJob& a, const ReadJob& b) {
102  auto aTimestamp = TimeComparisonKey(a, true);
103  auto bTimestamp = TimeComparisonKey(b, true);
104  if (aTimestamp == bTimestamp) {
105  return PositionComparisonKey(a, true) < PositionComparisonKey(b, true);
106  }
107  return aTimestamp < bTimestamp;
108  }
109 
110 public:
111  explicit ReadJobQueue(bool reverse)
112  : reverse_(reverse) {}
113  void push(DecompressChunkJob&& decompressChunkJob) {
114  heap_.emplace_back(std::move(decompressChunkJob));
115  if (!reverse_) {
116  std::push_heap(heap_.begin(), heap_.end(), CompareForward);
117  } else {
118  std::push_heap(heap_.begin(), heap_.end(), CompareReverse);
119  }
120  }
121 
122  void push(ReadMessageJob&& readMessageJob) {
123  heap_.emplace_back(std::move(readMessageJob));
124  if (!reverse_) {
125  std::push_heap(heap_.begin(), heap_.end(), CompareForward);
126  } else {
127  std::push_heap(heap_.begin(), heap_.end(), CompareReverse);
128  }
129  }
130 
132  if (!reverse_) {
133  std::pop_heap(heap_.begin(), heap_.end(), CompareForward);
134  } else {
135  std::pop_heap(heap_.begin(), heap_.end(), CompareReverse);
136  }
137  auto popped = heap_.back();
138  heap_.pop_back();
139  return popped;
140  }
141 
142  size_t len() const {
143  return heap_.size();
144  }
145 };
146 
147 } // namespace mcap::internal
mcap::internal::ReadMessageJob
A job to read a specific message at offset offset from the decompressed chunk stored in chunkReaderIn...
Definition: read_job_queue.hpp:17
mcap::internal::ReadJobQueue::ReadJobQueue
ReadJobQueue(bool reverse)
Definition: read_job_queue.hpp:111
mcap::internal::ReadJob
std::variant< ReadMessageJob, DecompressChunkJob > ReadJob
A union of jobs that an indexed MCAP reader executes.
Definition: read_job_queue.hpp:38
mcap::Timestamp
uint64_t Timestamp
Definition: types.hpp:20
types.hpp
mcap::internal::DecompressChunkJob::chunkStartOffset
ByteOffset chunkStartOffset
Definition: read_job_queue.hpp:31
arg
auto arg(const Char *name, const T &arg) -> detail::named_arg< Char, T >
Definition: core.h:1875
mcap::internal::ReadJobQueue
A priority queue of jobs for an indexed MCAP reader to execute.
Definition: read_job_queue.hpp:43
mcap::internal::ReadJobQueue::push
void push(DecompressChunkJob &&decompressChunkJob)
Definition: read_job_queue.hpp:113
mcap::internal::ReadJobQueue::reverse_
bool reverse_
Definition: read_job_queue.hpp:45
mcap::RecordOffset
Definition: types.hpp:353
mcap::internal
Definition: crc32.hpp:5
mcap::RecordOffset::offset
ByteOffset offset
Definition: types.hpp:354
mcap::internal::ReadMessageJob::chunkReaderIndex
size_t chunkReaderIndex
Definition: read_job_queue.hpp:20
mcap::internal::ReadJobQueue::len
size_t len() const
Definition: read_job_queue.hpp:142
backward::details::move
const T & move(const T &v)
Definition: backward.hpp:394
mcap::internal::ReadMessageJob::offset
RecordOffset offset
Definition: read_job_queue.hpp:19
sol::detail::decay_t
typename std::decay< T >::type decay_t
Definition: sol.hpp:4722
mcap::ByteOffset
uint64_t ByteOffset
Definition: types.hpp:21
emphasis::reverse
@ reverse
mcap::internal::DecompressChunkJob::messageEndTime
Timestamp messageEndTime
Definition: read_job_queue.hpp:30
mcap::internal::always_false_v
constexpr bool always_false_v
Definition: read_job_queue.hpp:11
mcap::internal::ReadJobQueue::PositionComparisonKey
static RecordOffset PositionComparisonKey(const ReadJob &job, bool reverse)
Definition: read_job_queue.hpp:71
mcap::internal::ReadJobQueue::heap_
std::vector< ReadJob > heap_
Definition: read_job_queue.hpp:46
mcap::internal::DecompressChunkJob::messageStartTime
Timestamp messageStartTime
Definition: read_job_queue.hpp:29
mcap::internal::DecompressChunkJob
A job to decompress the chunk starting at chunkStartOffset. The message indices starting directly aft...
Definition: read_job_queue.hpp:28
mcap::internal::ReadJobQueue::CompareReverse
static bool CompareReverse(const ReadJob &a, const ReadJob &b)
Definition: read_job_queue.hpp:101
mcap::internal::ReadJobQueue::CompareForward
static bool CompareForward(const ReadJob &a, const ReadJob &b)
Definition: read_job_queue.hpp:92
mcap::internal::ReadJobQueue::pop
ReadJob pop()
Definition: read_job_queue.hpp:131
mcap::internal::DecompressChunkJob::messageIndexEndOffset
ByteOffset messageIndexEndOffset
Definition: read_job_queue.hpp:32
mcap::internal::ReadJobQueue::push
void push(ReadMessageJob &&readMessageJob)
Definition: read_job_queue.hpp:122
mcap::internal::ReadMessageJob::timestamp
Timestamp timestamp
Definition: read_job_queue.hpp:18
mcap::internal::ReadJobQueue::TimeComparisonKey
static Timestamp TimeComparisonKey(const ReadJob &job, bool reverse)
return the timestamp key that should be used to compare jobs.
Definition: read_job_queue.hpp:51


plotjuggler
Author(s): Davide Faconti
autogenerated on Tue Nov 26 2024 03:24:09