bag_reader.cpp
Go to the documentation of this file.
1 // Fast native bag reader
2 // Author: Max Schwarz <max.schwarz@ais.uni-bonn.de>
3 
4 #include "bag_reader.h"
5 
6 #include <rosfmt/rosfmt.h>
7 #include <fmt/ostream.h>
8 #include <roslz4/lz4s.h>
9 
10 #include <sys/mman.h>
11 
12 #include <bzlib.h>
13 
14 #include <fcntl.h>
15 
16 #include <set>
17 
18 #include <rosbag/bag.h>
19 #include <std_msgs/Header.h>
20 #include <std_msgs/UInt8.h>
21 #include "doctest.h"
22 #include "rosbag/stream.h"
23 
24 namespace
25 {
27 
28  struct Span
29  {
30  uint8_t* start;
31  std::size_t size;
32  };
33 
34  struct Record
35  {
36  uint8_t* headerBegin;
37  uint32_t headerSize;
38 
39  uint8_t* dataBegin;
40  uint32_t dataSize;
41 
42  uint8_t* end;
43 
44  std::map<std::string, Span> headers;
45 
46  uint8_t op;
47 
48  template<class T>
49  T integralHeader(const std::string& name)
50  {
51  auto it = headers.find(name);
52  if(it == headers.end())
53  throw Exception{fmt::format("Could not find header '{}' in record of type {}\n", name, op)};
54 
55  if(it->second.size != sizeof(T))
56  throw Exception{fmt::format("Header '{}' has wrong size {} (expected {})\n", name, it->second.size, sizeof(T))};
57 
58  T ret;
59  std::memcpy(&ret, it->second.start, sizeof(T));
60  return ret;
61  }
62 
63  std::string stringHeader(const std::string& name, const std::optional<std::string>& defaultValue = {})
64  {
65  auto it = headers.find(name);
66  if(it == headers.end())
67  {
68  if(defaultValue)
69  return *defaultValue;
70  else
71  throw Exception{fmt::format("Could not find header '{}' in record of type {}\n", name, op)};
72  }
73 
74  return {reinterpret_cast<char*>(it->second.start), it->second.size};
75  }
76  };
77 
78  struct Chunk
79  {
80  static_assert(sizeof(rosbag_fancy::BagReader::ConnectionInfo) == 8, "ConnectionInfo should have size 8");
81 
82  uint8_t* chunkStart;
83  std::size_t chunkCompressedSize = 0;
84 
85  ros::Time startTime;
86  ros::Time endTime;
87 
88  std::vector<rosbag_fancy::BagReader::ConnectionInfo> connectionInfos;
89  };
90 
91  std::map<std::string, Span> readHeader(uint8_t* base, std::size_t remainingSize)
92  {
93  std::map<std::string, Span> ret;
94 
95  while(remainingSize > 0)
96  {
97  if(remainingSize < 4)
98  throw Exception{"Record too small"};
99 
100  uint32_t entrySize;
101  std::memcpy(&entrySize, base, 4);
102 
103  base += 4;
104  remainingSize -= 4;
105 
106  // Find '=' character
107  auto* equal = reinterpret_cast<uint8_t*>(std::memchr(base, '=', entrySize));
108  if(!equal)
109  throw Exception{"Invalid header map entry"};
110 
111  ret[std::string(reinterpret_cast<char*>(base), equal - base)]
112  = Span{equal+1, static_cast<std::size_t>(entrySize - 1 - (equal - base))};
113 
114  base += entrySize;
115  remainingSize -= entrySize;
116  }
117 
118  return ret;
119  }
120 
121  Record readRecord(uint8_t* base, std::size_t remainingSize)
122  {
123  Record record;
124 
125  if(remainingSize < 4)
126  throw Exception{"Record too small"};
127 
128  std::memcpy(&record.headerSize, base, 4);
129  remainingSize -= 4;
130  base += 4;
131 
132  if(remainingSize < record.headerSize)
133  throw Exception{"Record too small"};
134 
135  record.headerBegin = base;
136 
137  base += record.headerSize;
138  remainingSize -= record.headerSize;
139 
140  if(remainingSize < 4)
141  throw Exception{"Record too small"};
142 
143  std::memcpy(&record.dataSize, base, 4);
144  remainingSize -= 4;
145  base += 4;
146 
147  if(remainingSize < record.dataSize)
148  throw Exception{"Record too small"};
149 
150  record.dataBegin = base;
151 
152  record.end = base + record.dataSize;
153 
154  // Parse header
155  record.headers = readHeader(record.headerBegin, record.headerSize);
156 
157  auto it = record.headers.find("op");
158  if(it == record.headers.end())
159  throw Exception{"Record without op header"};
160 
161  if(it->second.size != 1)
162  throw Exception{fmt::format("op header has invalid size {}", it->second.size)};
163 
164  record.op = it->second.start[0];
165 
166  return record;
167  }
168 
169 
170  class MappedFile
171  {
172  public:
173  explicit MappedFile(const std::string& file)
174  {
175  m_fd = open(file.c_str(), O_RDONLY);
176  if(m_fd < 0)
177  throw Exception{fmt::format("Could not open file: {}", strerror(errno))};
178 
179  // Measure size
180  m_size = lseek(m_fd, 0, SEEK_END);
181  if(m_size < 0)
182  {
183  close(m_fd);
184  throw Exception{fmt::format("Could not seek: {}", strerror(errno))};
185  }
186 
187  lseek(m_fd, 0, SEEK_SET);
188 
189  m_data = reinterpret_cast<uint8_t*>(mmap(nullptr, m_size, PROT_READ, MAP_PRIVATE, m_fd, 0));
190  if(m_data == MAP_FAILED)
191  {
192  close(m_fd);
193  throw Exception{fmt::format("Could not mmap() bagfile: {}", strerror(errno))};
194  }
195  }
196 
197  ~MappedFile()
198  {
199  munmap(m_data, m_size);
200  close(m_fd);
201  }
202 
203  off_t size() const
204  { return m_size; }
205 
206  void* data()
207  { return m_data; }
208 
209  private:
210  int m_fd;
211  void* m_data;
212  off_t m_size;
213  };
214 }
215 
216 template <> struct fmt::formatter<Span>: formatter<string_view>
217 {
218  template <typename FormatContext>
219  auto format(Span c, FormatContext& ctx)
220  {
221  if(std::all_of(c.start, c.start + c.size, [](uint8_t c){return std::isprint(c);}))
222  return formatter<string_view>::format(string_view(reinterpret_cast<char*>(c.start), c.size), ctx);
223  else
224  {
225  std::stringstream s;
226  for(std::size_t i = 0; i < c.size; ++i)
227  {
228  if(i != 0)
229  fmt::print(s, " ");
230 
231  fmt::print(s, "{:02X}", c.start[i]);
232  }
233 
234  return formatter<string_view>::format(string_view(s.str()), ctx);
235  }
236  }
237 };
238 
239 
240 namespace rosbag_fancy
241 {
242 
244 {
245 public:
246  explicit Private(const std::string& filename)
247  : file{filename}
248  {}
249 
250  MappedFile file;
251  off_t size;
252  uint8_t* data;
253 
254  std::vector<Chunk> chunks;
255  std::map<std::uint32_t, Connection> connections;
256  std::map<std::string, std::vector<std::uint32_t>> connectionsForTopic;
257 
260 };
261 
263 {
264 public:
265  enum class Compression
266  {
267  None,
268  BZ2,
269  LZ4
270  };
271 
272  enum class ReadResult
273  {
274  OK,
275  EndOfFile
276  };
277 
279  {
280  switch(m_compression)
281  {
282  case Compression::None:
283  break;
284  case Compression::BZ2:
285  BZ2_bzDecompressEnd(&m_bzStream);
286  break;
287  case Compression::LZ4:
288  break;
289  }
290  }
291 
292  void readData(std::size_t amount)
293  {
294  switch(m_compression)
295  {
298  return;
300  {
301  std::size_t offset = m_uncompressedBuffer.size();
302  m_uncompressedBuffer.resize(offset + amount);
303 
304  m_bzStream.next_out = reinterpret_cast<char*>(m_uncompressedBuffer.data() + offset);
305  m_bzStream.avail_out = amount;
306  m_bzStream.total_out_lo32 = 0;
307 
308  int ret = BZ2_bzDecompress(&m_bzStream);
309  if(m_bzStream.total_out_lo32 != amount)
310  throw Exception{"Compressed chunk data ends prematurely! (not enough output)"};
311 
312  if(ret != BZ_STREAM_END && ret != BZ_OK)
313  throw Exception{fmt::format("BZ2 decoding error ({})", ret)};
314 
315  return;
316  }
317  }
318 
319  throw std::logic_error{"non-implemented compression mode"};
320  }
321 
322  Record readRecord()
323  {
324  switch(m_compression)
325  {
326  case Compression::None:
327  case Compression::LZ4:
328  {
330  m_remaining -= (record.end - m_dataPtr);
331  m_dataPtr = record.end;
332 
333  return record;
334  }
335 
336  case Compression::BZ2:
337  {
338  Record record;
339 
340  m_uncompressedBuffer.clear();
341  readData(4);
342 
343  std::memcpy(&record.headerSize, m_uncompressedBuffer.data(), 4);
344  readData(record.headerSize + 4);
345 
346  std::memcpy(&record.dataSize, m_uncompressedBuffer.data() + m_uncompressedBuffer.size() - 4, 4);
347  readData(record.dataSize);
348 
349  record.headerBegin = m_uncompressedBuffer.data() + 4;
350  record.dataBegin = m_uncompressedBuffer.data() + 4 + record.headerSize + 4;
351 
352  record.headers = readHeader(record.headerBegin, record.headerSize);
353 
354  auto it = record.headers.find("op");
355  if(it == record.headers.end())
356  throw Exception{"Record without op header"};
357 
358  if(it->second.size != 1)
359  throw Exception{fmt::format("op header has invalid size {}", it->second.size)};
360 
361  record.op = it->second.start[0];
362 
364 
365  return record;
366  }
367  }
368 
369  throw std::logic_error{"non-implemented compression mode"};
370  }
371 
372  const BagReader* m_reader{};
374 
376  bz_stream m_bzStream{};
377 
378  uint8_t* m_dataPtr = {};
380 
381  std::vector<uint8_t> m_uncompressedBuffer;
382 };
383 
385  : m_d{std::make_unique<Private>()}
386 {
387  m_d->m_reader = reader;
388  m_d->m_chunk = chunk;
389 
390  auto& chunkData = reader->m_d->chunks[chunk];
391  auto rec = readRecord(chunkData.chunkStart, chunkData.chunkCompressedSize);
392 
393  std::string compression = rec.stringHeader("compression");
394  if(compression == "none")
395  m_d->m_compression = Private::Compression::None;
396  else if(compression == "bz2")
397  m_d->m_compression = Private::Compression::BZ2;
398  else if(compression == "lz4")
399  m_d->m_compression = Private::Compression::LZ4;
400  else
401  throw Exception{fmt::format("Unknown compression type '{}'", compression)};
402 
403  switch(m_d->m_compression)
404  {
406  m_d->m_dataPtr = rec.dataBegin;
407  m_d->m_remaining = rec.dataSize;
408  break;
409 
410  case Private::Compression::BZ2:
411  {
412  int ret = BZ2_bzDecompressInit(&m_d->m_bzStream, 0, 0);
413  if(ret != BZ_OK)
414  throw Exception{fmt::format("Could not initialize BZ2 decompression ({})", ret)};
415 
416  m_d->m_bzStream.next_in = reinterpret_cast<char*>(rec.dataBegin);
417  m_d->m_bzStream.avail_in = rec.dataSize;
418 
419  m_d->m_remaining = rec.integralHeader<std::uint32_t>("size");
420 
421  break;
422  }
423 
424  case Private::Compression::LZ4:
425  {
426  // roslz4's streaming capabilities are not flexible enough for the code path above.
427  // So for now, we just decode the entire chunk on the fly.
428 
429  m_d->m_remaining = rec.integralHeader<std::uint32_t>("size");
430  m_d->m_uncompressedBuffer.resize(m_d->m_remaining);
431 
432  unsigned int length = m_d->m_remaining;
433 
434  int ret = roslz4_buffToBuffDecompress(
435  reinterpret_cast<char*>(rec.dataBegin), rec.dataSize,
436  reinterpret_cast<char*>(m_d->m_uncompressedBuffer.data()), &length
437  );
438 
439  if(ret != ROSLZ4_OK)
440  throw Exception{fmt::format("Could not decode LZ4 chunk: {}", ret)};
441 
442  if(m_d->m_remaining != length)
443  throw Exception{fmt::format("LZ4 size mismatch: got {}, expected {} bytes", length, m_d->m_remaining)};
444 
445  m_d->m_dataPtr = m_d->m_uncompressedBuffer.data();
446 
447  break;
448  }
449  }
450 
451  (*this)++;
452 }
453 
455 {
456  if(!m_d)
457  return *this;
458 
459  while(true)
460  {
461  if(m_d->m_remaining == 0)
462  {
463  m_d.reset();
464  m_idx = 0;
465  return *this;
466  }
467 
468  auto rec = m_d->readRecord();
469 
470  if(rec.op == 0x02)
471  {
472  m_msg.m_data = rec.dataBegin;
473  m_msg.m_size = rec.dataSize;
474 
475  uint32_t connID = rec.integralHeader<uint32_t>("conn");
476  if(connID > m_d->m_reader->connections().size())
477  throw Exception{fmt::format("Invalid connection ID {}", connID)};
478 
479  m_msg.connection = &m_d->m_reader->connections().at(connID);
480 
481  uint64_t time = rec.integralHeader<uint64_t>("time");
482  m_msg.stamp = ros::Time(time & 0xFFFFFFFF, time >> 32);
483 
484  break;
485  }
486  }
487 
488  return *this;
489 }
490 
491 BagReader::Iterator::Iterator(const BagReader* reader, int chunk)
492  : m_reader{reader}
493  , m_chunk{chunk}
494  , m_it{reader, chunk}
495 {
496 }
497 
499 {
500  m_it++;
501  if(m_it == ChunkIterator{})
502  {
503  m_chunk++;
504  if(m_chunk < static_cast<int>(m_reader->m_d->chunks.size()))
505  {
506  m_it = ChunkIterator{m_reader, m_chunk};
507  }
508  else
509  {
510  m_chunk = -1;
511  m_it = {};
512  }
513  }
514 
515  return *this;
516 }
517 
518 void BagReader::Iterator::advanceWithPredicates(const std::function<bool(const ConnectionInfo&)>& connPredicate, const std::function<bool(const Message&)>& messagePredicate)
519 {
520  m_it++;
521 
522  if(m_it == ChunkIterator{})
523  {
524  m_chunk++;
525 
526  if(m_chunk == static_cast<int>(m_reader->m_d->chunks.size()))
527  {
528  m_chunk = -1;
529  m_it = {};
530  return;
531  }
532  }
533 
534  findNextWithPredicates(connPredicate, messagePredicate);
535 }
536 
537 void BagReader::Iterator::findNextWithPredicates(const std::function<bool(const ConnectionInfo&)>& connPredicate, const std::function<bool(const Message&)>& messagePredicate)
538 {
539  auto currentChunkIsInteresting = [&](){
540  auto& chunkData = m_reader->m_d->chunks[m_chunk];
541  return std::any_of(chunkData.connectionInfos.begin(), chunkData.connectionInfos.end(), connPredicate);
542  };
543 
544  while(true)
545  {
546  if(m_it == ChunkIterator{})
547  {
548  if(m_chunk == -1 || m_chunk == static_cast<int>(m_reader->m_d->chunks.size()))
549  {
550  m_chunk = -1;
551  return;
552  }
553 
554  // Find next matching chunk
555  while(!currentChunkIsInteresting())
556  {
557  m_chunk++;
558  if(m_chunk >= static_cast<int>(m_reader->m_d->chunks.size()))
559  {
560  m_chunk = -1;
561  return;
562  }
563  }
564 
565  m_it = ChunkIterator{m_reader, m_chunk};
566  }
567 
568  // We are now in a chunk which contains interesting connections.
569 
570  // Iterate through the chunk
571  while(m_it != ChunkIterator{})
572  {
573  if(messagePredicate(*m_it))
574  return; // Found something! *this is pointing at the message.
575 
576  ++m_it;
577  }
578 
579  // Next chunk
580  m_chunk++;
581  }
582 }
583 
584 std::vector<BagReader::ConnectionInfo>& BagReader::Iterator::currentChunkConnections() const
585 {
586  return m_reader->m_d->chunks[m_chunk].connectionInfos;
587 }
588 
590 {
591  switch(m_it.m_d->m_compression)
592  {
599  }
600 
601  throw std::logic_error{"unknown compression"};
602 }
603 
604 
605 BagReader::BagReader(const std::string& filename)
606  : m_d{std::make_unique<Private>(filename)}
607 {
608  m_d->data = reinterpret_cast<uint8_t*>(m_d->file.data());
609  m_d->size = m_d->file.size();
610 
611  // Read version line
612  std::string versionLine;
613  uint8_t* bagHeaderBegin{};
614  {
615  constexpr int MAX_VERSION_LENGTH = 20;
616  auto* newline = reinterpret_cast<uint8_t*>(std::memchr(
617  m_d->data, '\n', std::min<off_t>(m_d->size, MAX_VERSION_LENGTH))
618  );
619  if(!newline)
620  throw Exception{fmt::format("Could not read version line\n")};
621 
622  versionLine = std::string(reinterpret_cast<char*>(m_d->data), newline - m_d->data);
623  bagHeaderBegin = newline + 1;
624  }
625 
626  if(versionLine != "#ROSBAG V2.0")
627  throw Exception{"I can't read this file (I can only read ROSBAG V2.0"};
628 
629  Record bagHeader = readRecord(bagHeaderBegin, m_d->size - (bagHeaderBegin - m_d->data));
630 
631  if(bagHeader.op != 0x03)
632  throw Exception{"First record is not a bag header\n"};
633 
634  std::uint64_t indexPos = bagHeader.integralHeader<std::uint64_t>("index_pos");
635  std::uint32_t connectionCount = bagHeader.integralHeader<std::uint32_t>("conn_count");
636  std::uint32_t chunkCount = bagHeader.integralHeader<std::uint32_t>("chunk_count");
637 
638  if(indexPos >= static_cast<std::uint64_t>(m_d->size))
639  throw Exception{fmt::format("Index position is too large: {} vs data size {}", indexPos, m_d->size)};
640 
641  // Read all index records
642  uint8_t* rptr = m_d->data + indexPos;
643  std::size_t remaining = m_d->size - indexPos;
644 
645  // Connection records
646  for(std::size_t i = 0; i < connectionCount; ++i)
647  {
648  Record rec = readRecord(rptr, remaining);
649  if(rec.op != 7)
650  throw Exception{"Expected connection record"};
651 
652  Record recData;
653  recData.headers = readHeader(rec.dataBegin, rec.dataSize);
654 
655  Connection con;
656  con.id = rec.integralHeader<uint32_t>("conn");
657  con.topicInBag = rec.stringHeader("topic");
658  con.topicAsPublished = recData.stringHeader("topic", std::string{});
659  con.type = recData.stringHeader("type");
660  con.md5sum = recData.stringHeader("md5sum");
661  con.msgDef = recData.stringHeader("message_definition");
662 
663  {
664  auto it = recData.headers.find("callerid");
665  if(it != recData.headers.end())
666  con.callerID = recData.stringHeader("callerid");
667  }
668  {
669  auto it = recData.headers.find("latching");
670  if(it != recData.headers.end())
671  {
672  if(it->second.size != 1)
673  throw Exception{"Invalid latching header"};
674 
675  if(it->second.start[0] == '1')
676  con.latching = true;
677  }
678  }
679 
680  m_d->connections[con.id] = con;
681  m_d->connectionsForTopic[con.topicInBag].push_back(con.id);
682 
683  rptr = rec.end;
684  remaining = m_d->size - (rptr - m_d->data);
685  }
686 
687  // Chunk infos
688  m_d->chunks.reserve(chunkCount);
689  Chunk* lastChunk = {};
690  for(std::size_t i = 0; i < chunkCount; ++i)
691  {
692  Record rec = readRecord(rptr, remaining);
693  if(rec.op != 6)
694  throw Exception{"Expected chunk info record"};
695 
696  auto& chunk = m_d->chunks.emplace_back();
697 
698  auto version = rec.integralHeader<std::uint32_t>("ver");
699  if(version != 1)
700  throw Exception{fmt::format("Unsupported chunk info version {}", version)};
701 
702  auto chunkPos = rec.integralHeader<std::uint64_t>("chunk_pos");
703  if(chunkPos >= static_cast<std::uint64_t>(m_d->size))
704  throw Exception{"chunk_pos points outside of valid range"};
705 
706  chunk.chunkStart = m_d->data + chunkPos;
707  if(lastChunk)
708  lastChunk->chunkCompressedSize = chunk.chunkStart - lastChunk->chunkStart;
709 
710  std::uint64_t startTime = rec.integralHeader<std::uint64_t>("start_time");
711  std::uint64_t endTime = rec.integralHeader<std::uint64_t>("end_time");
712 
713  chunk.startTime = ros::Time(startTime & 0xFFFFFFFF, startTime >> 32);
714  chunk.endTime = ros::Time(endTime & 0xFFFFFFFF, endTime >> 32);
715 
716  auto numConnections = rec.integralHeader<std::uint32_t>("count");
717 
718  if(rec.dataSize < numConnections * sizeof(ConnectionInfo))
719  throw Exception{"Chunk info is too small"};
720 
721  chunk.connectionInfos.resize(numConnections);
722  std::memcpy(chunk.connectionInfos.data(), rec.dataBegin, numConnections * sizeof(ConnectionInfo));
723 
724  for(auto& connInfo : chunk.connectionInfos)
725  {
726  auto it = m_d->connections.find(connInfo.id);
727  if(it == m_d->connections.end())
728  throw Exception{fmt::format("Could not find connection with id {}", connInfo.id)};
729 
730  it->second.totalCount += connInfo.msgCount;
731  }
732 
733  rptr = rec.end;
734  remaining = m_d->size - (rptr - m_d->data);
735  lastChunk = &chunk;
736  }
737  if(lastChunk)
738  lastChunk->chunkCompressedSize = (m_d->data + m_d->size) - lastChunk->chunkStart;
739 
740  if(!m_d->chunks.empty())
741  m_d->startTime = m_d->chunks.front().startTime;
742 
743  for(auto& c : m_d->chunks)
744  {
745  m_d->startTime = std::min(c.startTime, m_d->startTime);
746  m_d->endTime = std::max(c.endTime, m_d->endTime);
747  }
748 }
749 
751  : m_d{std::move(other.m_d)}
752 {
753 }
754 
756 {
757 }
758 
760 {
761  return m_d->connections;
762 }
763 
765 {
766  return m_d->startTime;
767 }
768 
770 {
771  return m_d->endTime;
772 }
773 
775 {
776  return m_d->size;
777 }
778 
780 {
781  if(m_d->chunks.empty())
782  return {};
783 
784  return Iterator{this, 0};
785 }
786 
788 {
789  return Iterator();
790 }
791 
793 {
794  // Find chunk
795  auto chunkIt = std::lower_bound(m_d->chunks.begin(), m_d->chunks.end(), time, [&](Chunk& chunk, const ros::Time& time){
796  return chunk.endTime < time;
797  });
798 
799  if(chunkIt == m_d->chunks.end())
800  return {};
801 
802  int chunkIdx = chunkIt - m_d->chunks.begin();
803 
804  // Find message in chunk
805  auto it = Iterator{this, chunkIdx};
806  for(; it != end(); ++it)
807  {
808  if(it->stamp > time)
809  return it;
810  }
811 
812  return it;
813 }
814 
816 {
817  if(chunk < 0 || chunk >= static_cast<int>(m_d->chunks.size()))
818  return {};
819 
820  return Iterator{this, chunk};
821 }
822 
824 {
825  return m_d->chunks.size();
826 }
827 
828 
829 // TESTS
830 
831 TEST_CASE("BagView: One file")
832 {
833  // Generate a bag file
834  char bagfileName[] = "/tmp/rosbag_fancy_test_XXXXXX";
835  {
836  int fd = mkstemp(bagfileName);
837  REQUIRE(fd >= 0);
838  close(fd);
839 
840  rosbag::Bag bag{bagfileName, rosbag::BagMode::Write};
841 
842  SUBCASE("plain")
843  { bag.setCompression(rosbag::CompressionType::Uncompressed); }
844  SUBCASE("BZ2")
845  { bag.setCompression(rosbag::CompressionType::BZ2); }
846  SUBCASE("LZ4")
847  { bag.setCompression(rosbag::CompressionType::LZ4); }
848 
849  {
850  std_msgs::Header msg;
851  msg.frame_id = "a";
852  bag.write("/topicA", ros::Time(1000, 0), msg);
853  }
854  {
855  std_msgs::Header msg;
856  msg.frame_id = "b";
857  bag.write("/topicB", ros::Time(1001, 0), msg);
858  }
859  {
860  std_msgs::UInt8 msg;
861  msg.data = 123;
862  bag.write("/topicC", ros::Time(1002, 0), msg);
863  }
864 
865  bag.close();
866  }
867 
868  // Open bagfile
869  BagReader reader{bagfileName};
870 
871  auto it = reader.begin();
872 
873  REQUIRE(it != reader.end());
874  CHECK(it->connection->topicInBag == "/topicA");
875 
876  ++it; REQUIRE(it != reader.end());
877  CHECK(it->connection->topicInBag == "/topicB");
878 
879  ++it; REQUIRE(it != reader.end());
880  CHECK(it->connection->topicInBag == "/topicC");
881 
882  ++it; CHECK(it == reader.end());
883 }
884 
885 
886 }
rosfmt.h
rosbag_fancy::BagReader::Private::connectionsForTopic
std::map< std::string, std::vector< std::uint32_t > > connectionsForTopic
Definition: bag_reader.cpp:256
rosbag_fancy::BagReader::ChunkIterator::Private::readData
void readData(std::size_t amount)
Definition: bag_reader.cpp:292
rosbag_fancy::BagReader::ChunkIterator::Private::m_reader
const BagReader * m_reader
Definition: bag_reader.cpp:372
rosbag_fancy::BagReader::Private::file
MappedFile file
Definition: bag_reader.cpp:250
rosbag::Bag
SUBCASE
#define SUBCASE(name)
Definition: doctest.h:2903
rosbag_fancy::BagReader::ChunkIterator::Private::m_dataPtr
uint8_t * m_dataPtr
Definition: bag_reader.cpp:378
rosbag_fancy::BagReader::ChunkIterator::Private::ReadResult::OK
@ OK
rosbag_fancy::BagReader::ChunkIterator::Private::Compression::None
@ None
rosbag_fancy::BagReader::Iterator::Iterator
Iterator()
Definition: bag_reader.h:133
s
XmlRpcServer s
rosbag_fancy::BagReader::Private::size
off_t size
Definition: bag_reader.cpp:251
rosbag_fancy::BagReader::chunkBegin
Iterator chunkBegin(int chunk) const
Definition: bag_reader.cpp:815
rosbag_fancy::BagReader::ChunkIterator::Private::m_bzStream
bz_stream m_bzStream
Definition: bag_reader.cpp:376
rosbag_fancy::BagReader::Iterator::currentChunkConnections
std::vector< ConnectionInfo > & currentChunkConnections() const
Definition: bag_reader.cpp:584
rosbag_fancy
Definition: bag_reader.cpp:240
rosbag_fancy::BagReader::endTime
ros::Time endTime() const
Definition: bag_reader.cpp:769
doctest::TestCaseFailureReason::Exception
@ Exception
Definition: doctest.h:1963
rosbag_fancy::BagReader::Private::chunks
std::vector< Chunk > chunks
Definition: bag_reader.cpp:254
rosbag_fancy::BagReader::ChunkIterator::Private::Compression
Compression
Definition: bag_reader.cpp:265
roslz4_buffToBuffDecompress
int roslz4_buffToBuffDecompress(char *input, unsigned int input_size, char *output, unsigned int *output_size)
rosbag_fancy::BagReader::Private::Private
Private(const std::string &filename)
Definition: bag_reader.cpp:246
rosbag::compression::LZ4
LZ4
rosbag_fancy::BagReader::~BagReader
~BagReader()
Definition: bag_reader.cpp:755
record
int record(const std::vector< std::string > &options)
Definition: cmd_record.cpp:27
rosbag_fancy::BagReader::Message
Definition: bag_reader.h:58
bag.h
rosbag::compression::BZ2
BZ2
std::size_t
decltype(sizeof(void *)) typede size_t)
Definition: doctest.h:501
rosbag_fancy::BagReader::ConnectionInfo
Definition: bag_reader.h:49
rosbag_fancy::BagReader::end
Iterator end() const
Definition: bag_reader.cpp:787
rosbag_fancy::BagReader::size
std::size_t size() const
Definition: bag_reader.cpp:774
rosbag_fancy::BagReader::ChunkIterator::Private::m_remaining
std::size_t m_remaining
Definition: bag_reader.cpp:379
REQUIRE
#define REQUIRE(...)
Definition: doctest.h:2934
rosbag_fancy::BagReader::ChunkIterator::operator++
ChunkIterator & operator++()
Definition: bag_reader.cpp:454
rosbag_fancy::BagReader::ChunkIterator::Private::readRecord
Record readRecord()
Definition: bag_reader.cpp:322
rosbag_fancy::BagReader::Iterator::currentChunkCompression
rosbag::CompressionType currentChunkCompression() const
Definition: bag_reader.cpp:589
rosbag_fancy::BagReader::Private
Definition: bag_reader.cpp:243
rosbag_fancy::BagReader::Iterator::advanceWithPredicates
void advanceWithPredicates(const std::function< bool(const ConnectionInfo &)> &connPredicate, const std::function< bool(const Message &)> &messagePredicate)
Definition: bag_reader.cpp:518
rosbag_fancy::BagReader::ChunkIterator::Private::m_compression
Compression m_compression
Definition: bag_reader.cpp:375
rosbag_fancy::BagReader::ConnectionMap
std::map< std::uint32_t, Connection > ConnectionMap
Definition: bag_reader.h:47
rosbag_fancy::BagReader::connections
const ConnectionMap & connections() const
Definition: bag_reader.cpp:759
rosbag_fancy::BagReader::ChunkIterator::Private::Compression::LZ4
@ LZ4
rosbag_fancy::BagReader::ChunkIterator::Private::Compression::BZ2
@ BZ2
CHECK
#define CHECK(...)
Definition: doctest.h:2927
rosbag_fancy::BagReader::ChunkIterator
Definition: bag_reader.h:93
rosbag_fancy::BagReader::numChunks
std::size_t numChunks() const
Definition: bag_reader.cpp:823
rosbag_fancy::BagReader::Iterator::findNextWithPredicates
void findNextWithPredicates(const std::function< bool(const ConnectionInfo &)> &connPredicate, const std::function< bool(const Message &)> &messagePredicate)
Definition: bag_reader.cpp:537
rosbag_fancy::BagReader::Private::data
uint8_t * data
Definition: bag_reader.cpp:252
rosbag_fancy::BagReader::ChunkIterator::Private::ReadResult::EndOfFile
@ EndOfFile
rosbag_fancy::BagReader::m_d
std::unique_ptr< Private > m_d
Definition: bag_reader.h:190
doctest::Color::None
@ None
Definition: doctest.h:670
rosbag_fancy::BagReader
Definition: bag_reader.h:21
format
std::string format
Definition: ui.cpp:76
rosbag::compression::CompressionType
CompressionType
rosbag_fancy::BagReader::Iterator
Definition: bag_reader.h:125
start
ROSCPP_DECL void start()
rosbag_fancy::TEST_CASE
TEST_CASE("BagView: One file")
Definition: bag_reader.cpp:831
rosbag_fancy::BagReader::ChunkIterator::Private::ReadResult
ReadResult
Definition: bag_reader.cpp:272
ros::Time
fmt::formatter< Span >::format
auto format(Span c, FormatContext &ctx)
Definition: bag_reader.cpp:219
rosbag_fancy::BagReader::begin
Iterator begin() const
Definition: bag_reader.cpp:779
rosbag_fancy::BagReader::Iterator::operator++
Iterator & operator++()
Definition: bag_reader.cpp:498
lz4s.h
rosbag_fancy::BagReader::ChunkIterator::Private::~Private
~Private()
Definition: bag_reader.cpp:278
bag_reader.h
rosbag_fancy::BagReader::Exception
Definition: bag_reader.h:24
rosbag::compression::Uncompressed
Uncompressed
doctest.h
rosbag_fancy::BagReader::Private::startTime
ros::Time startTime
Definition: bag_reader.cpp:258
rosbag_fancy::BagReader::Private::connections
std::map< std::uint32_t, Connection > connections
Definition: bag_reader.cpp:255
length
TF2SIMD_FORCE_INLINE tf2Scalar length(const Quaternion &q)
rosbag_fancy::BagReader::BagReader
BagReader(const std::string &filename)
Definition: bag_reader.cpp:605
rosbag_fancy::BagReader::ChunkIterator::Private::m_uncompressedBuffer
std::vector< uint8_t > m_uncompressedBuffer
Definition: bag_reader.cpp:381
rosbag_fancy::BagReader::findTime
Iterator findTime(const ros::Time &time) const
Definition: bag_reader.cpp:792
rosbag_fancy::BagReader::Private::endTime
ros::Time endTime
Definition: bag_reader.cpp:259
rosbag_fancy::BagReader::ChunkIterator::Private::m_chunk
std::size_t m_chunk
Definition: bag_reader.cpp:373
rosbag_fancy::BagReader::startTime
ros::Time startTime() const
Definition: bag_reader.cpp:764
rosbag_fancy::BagReader::ChunkIterator::Private
Definition: bag_reader.cpp:262
rosbag_fancy::BagReader::ChunkIterator::ChunkIterator
ChunkIterator()
Definition: bag_reader.h:116
stream.h


rosbag_fancy
Author(s):
autogenerated on Tue Feb 20 2024 03:20:59