7 #include <fmt/ostream.h> 19 #include <std_msgs/Header.h> 20 #include <std_msgs/UInt8.h> 44 std::map<std::string, Span> headers;
49 T integralHeader(
const std::string& name)
51 auto it = headers.find(name);
52 if(it == headers.end())
55 if(it->second.size !=
sizeof(T))
56 throw Exception{
fmt::format(
"Header '{}' has wrong size {} (expected {})\n", name, it->second.size,
sizeof(T))};
59 std::memcpy(&ret, it->second.start,
sizeof(T));
63 std::string stringHeader(
const std::string& name,
const std::optional<std::string>& defaultValue = {})
65 auto it = headers.find(name);
66 if(it == headers.end())
74 return {
reinterpret_cast<char*
>(it->second.start), it->second.size};
88 std::vector<rosbag_fancy::BagReader::ConnectionInfo> connectionInfos;
91 std::map<std::string, Span> readHeader(uint8_t* base,
std::size_t remainingSize)
93 std::map<std::string, Span> ret;
95 while(remainingSize > 0)
101 std::memcpy(&entrySize, base, 4);
107 auto* equal =
reinterpret_cast<uint8_t*
>(std::memchr(base,
'=', entrySize));
109 throw Exception{
"Invalid header map entry"};
111 ret[std::string(reinterpret_cast<char*>(base), equal - base)]
112 = Span{equal+1,
static_cast<std::size_t>(entrySize - 1 - (equal - base))};
115 remainingSize -= entrySize;
121 Record readRecord(uint8_t* base,
std::size_t remainingSize)
125 if(remainingSize < 4)
128 std::memcpy(&record.headerSize, base, 4);
132 if(remainingSize < record.headerSize)
135 record.headerBegin = base;
137 base += record.headerSize;
138 remainingSize -= record.headerSize;
140 if(remainingSize < 4)
143 std::memcpy(&record.dataSize, base, 4);
147 if(remainingSize < record.dataSize)
150 record.dataBegin = base;
152 record.end = base + record.dataSize;
155 record.headers = readHeader(record.headerBegin, record.headerSize);
157 auto it = record.headers.find(
"op");
158 if(it == record.headers.end())
159 throw Exception{
"Record without op header"};
161 if(it->second.size != 1)
164 record.op = it->second.start[0];
173 explicit MappedFile(
const std::string& file)
175 m_fd = open(file.c_str(), O_RDONLY);
180 m_size = lseek(m_fd, 0, SEEK_END);
187 lseek(m_fd, 0, SEEK_SET);
189 m_data =
reinterpret_cast<uint8_t*
>(mmap(
nullptr, m_size, PROT_READ, MAP_PRIVATE, m_fd, 0));
190 if(m_data == MAP_FAILED)
199 munmap(m_data, m_size);
216 template <>
struct fmt::formatter<Span>: formatter<string_view>
218 template <
typename FormatContext>
221 if(std::all_of(c.start, c.start + c.size, [](uint8_t c){return std::isprint(c);}))
231 fmt::print(s,
"{:02X}", c.start[i]);
280 switch(m_compression)
284 case Compression::BZ2:
285 BZ2_bzDecompressEnd(&m_bzStream);
287 case Compression::LZ4:
294 switch(m_compression)
297 case Private::Compression::LZ4:
299 case Private::Compression::BZ2:
302 m_uncompressedBuffer.resize(offset + amount);
304 m_bzStream.next_out =
reinterpret_cast<char*
>(m_uncompressedBuffer.data() + offset);
305 m_bzStream.avail_out = amount;
306 m_bzStream.total_out_lo32 = 0;
308 int ret = BZ2_bzDecompress(&m_bzStream);
309 if(m_bzStream.total_out_lo32 != amount)
310 throw Exception{
"Compressed chunk data ends prematurely! (not enough output)"};
312 if(ret != BZ_STREAM_END && ret != BZ_OK)
319 throw std::logic_error{
"non-implemented compression mode"};
324 switch(m_compression)
327 case Compression::LZ4:
329 auto record = ::readRecord(m_dataPtr, m_remaining);
330 m_remaining -= (record.end - m_dataPtr);
331 m_dataPtr = record.end;
336 case Compression::BZ2:
340 m_uncompressedBuffer.clear();
343 std::memcpy(&record.headerSize, m_uncompressedBuffer.data(), 4);
344 readData(record.headerSize + 4);
346 std::memcpy(&record.dataSize, m_uncompressedBuffer.data() + m_uncompressedBuffer.size() - 4, 4);
347 readData(record.dataSize);
349 record.headerBegin = m_uncompressedBuffer.data() + 4;
350 record.dataBegin = m_uncompressedBuffer.data() + 4 + record.headerSize + 4;
352 record.headers = readHeader(record.headerBegin, record.headerSize);
354 auto it = record.headers.find(
"op");
355 if(it == record.headers.end())
356 throw Exception{
"Record without op header"};
358 if(it->second.size != 1)
361 record.op = it->second.start[0];
363 m_remaining -= m_uncompressedBuffer.size();
369 throw std::logic_error{
"non-implemented compression mode"};
376 bz_stream m_bzStream{};
378 uint8_t* m_dataPtr = {};
384 BagReader::ChunkIterator::ChunkIterator(
const BagReader* reader,
int chunk)
385 : m_d{std::make_unique<Private>()}
387 m_d->m_reader = reader;
388 m_d->m_chunk = chunk;
390 auto& chunkData = reader->m_d->chunks[chunk];
391 auto rec = readRecord(chunkData.chunkStart, chunkData.chunkCompressedSize);
393 std::string compression = rec.stringHeader(
"compression");
394 if(compression ==
"none")
396 else if(compression ==
"bz2")
398 else if(compression ==
"lz4")
403 switch(
m_d->m_compression)
406 m_d->m_dataPtr = rec.dataBegin;
407 m_d->m_remaining = rec.dataSize;
412 int ret = BZ2_bzDecompressInit(&
m_d->m_bzStream, 0, 0);
416 m_d->m_bzStream.next_in =
reinterpret_cast<char*
>(rec.dataBegin);
417 m_d->m_bzStream.avail_in = rec.dataSize;
419 m_d->m_remaining = rec.integralHeader<std::uint32_t>(
"size");
429 m_d->m_remaining = rec.integralHeader<std::uint32_t>(
"size");
430 m_d->m_uncompressedBuffer.resize(
m_d->m_remaining);
435 reinterpret_cast<char*>(rec.dataBegin), rec.dataSize,
436 reinterpret_cast<char*>(
m_d->m_uncompressedBuffer.data()), &length
442 if(
m_d->m_remaining != length)
445 m_d->m_dataPtr =
m_d->m_uncompressedBuffer.data();
461 if(
m_d->m_remaining == 0)
468 auto rec =
m_d->readRecord();
475 uint32_t connID = rec.integralHeader<uint32_t>(
"conn");
476 if(connID >
m_d->m_reader->connections().size())
481 uint64_t time = rec.integralHeader<uint64_t>(
"time");
539 auto currentChunkIsInteresting = [&](){
541 return std::any_of(chunkData.connectionInfos.begin(), chunkData.connectionInfos.end(), connPredicate);
555 while(!currentChunkIsInteresting())
573 if(messagePredicate(*
m_it))
591 switch(
m_it.
m_d->m_compression)
601 throw std::logic_error{
"unknown compression"};
606 :
m_d{std::make_unique<Private>(filename)}
608 m_d->data =
reinterpret_cast<uint8_t*
>(
m_d->file.data());
609 m_d->size =
m_d->file.size();
612 std::string versionLine;
613 uint8_t* bagHeaderBegin{};
615 constexpr
int MAX_VERSION_LENGTH = 20;
616 auto* newline =
reinterpret_cast<uint8_t*
>(std::memchr(
617 m_d->data,
'\n', std::min<off_t>(
m_d->size, MAX_VERSION_LENGTH))
622 versionLine = std::string(reinterpret_cast<char*>(
m_d->data), newline -
m_d->data);
623 bagHeaderBegin = newline + 1;
626 if(versionLine !=
"#ROSBAG V2.0")
627 throw Exception{
"I can't read this file (I can only read ROSBAG V2.0"};
629 Record bagHeader = readRecord(bagHeaderBegin,
m_d->size - (bagHeaderBegin -
m_d->data));
631 if(bagHeader.op != 0x03)
632 throw Exception{
"First record is not a bag header\n"};
634 std::uint64_t indexPos = bagHeader.integralHeader<std::uint64_t>(
"index_pos");
635 std::uint32_t connectionCount = bagHeader.integralHeader<std::uint32_t>(
"conn_count");
636 std::uint32_t chunkCount = bagHeader.integralHeader<std::uint32_t>(
"chunk_count");
638 if(indexPos >= static_cast<std::uint64_t>(
m_d->size))
642 uint8_t* rptr =
m_d->data + indexPos;
648 Record rec = readRecord(rptr, remaining);
650 throw Exception{
"Expected connection record"};
653 recData.headers = readHeader(rec.dataBegin, rec.dataSize);
656 con.
id = rec.integralHeader<uint32_t>(
"conn");
659 con.
type = recData.stringHeader(
"type");
660 con.
md5sum = recData.stringHeader(
"md5sum");
661 con.
msgDef = recData.stringHeader(
"message_definition");
664 auto it = recData.headers.find(
"callerid");
665 if(it != recData.headers.end())
666 con.
callerID = recData.stringHeader(
"callerid");
669 auto it = recData.headers.find(
"latching");
670 if(it != recData.headers.end())
672 if(it->second.size != 1)
673 throw Exception{
"Invalid latching header"};
675 if(it->second.start[0] ==
'1')
680 m_d->connections[con.
id] = con;
684 remaining =
m_d->size - (rptr -
m_d->data);
688 m_d->chunks.reserve(chunkCount);
689 Chunk* lastChunk = {};
692 Record rec = readRecord(rptr, remaining);
694 throw Exception{
"Expected chunk info record"};
696 auto& chunk =
m_d->chunks.emplace_back();
698 auto version = rec.integralHeader<std::uint32_t>(
"ver");
702 auto chunkPos = rec.integralHeader<std::uint64_t>(
"chunk_pos");
703 if(chunkPos >= static_cast<std::uint64_t>(
m_d->size))
704 throw Exception{
"chunk_pos points outside of valid range"};
706 chunk.chunkStart =
m_d->data + chunkPos;
708 lastChunk->chunkCompressedSize = chunk.chunkStart - lastChunk->chunkStart;
710 std::uint64_t
startTime = rec.integralHeader<std::uint64_t>(
"start_time");
711 std::uint64_t
endTime = rec.integralHeader<std::uint64_t>(
"end_time");
713 chunk.startTime =
ros::Time(startTime & 0xFFFFFFFF, startTime >> 32);
714 chunk.endTime =
ros::Time(endTime & 0xFFFFFFFF, endTime >> 32);
716 auto numConnections = rec.integralHeader<std::uint32_t>(
"count");
719 throw Exception{
"Chunk info is too small"};
721 chunk.connectionInfos.resize(numConnections);
722 std::memcpy(chunk.connectionInfos.data(), rec.dataBegin, numConnections *
sizeof(
ConnectionInfo));
724 for(
auto& connInfo : chunk.connectionInfos)
726 auto it =
m_d->connections.find(connInfo.id);
727 if(it ==
m_d->connections.end())
730 it->second.totalCount += connInfo.msgCount;
734 remaining =
m_d->size - (rptr -
m_d->data);
738 lastChunk->chunkCompressedSize = (
m_d->data +
m_d->size) - lastChunk->chunkStart;
740 if(!
m_d->chunks.empty())
741 m_d->startTime =
m_d->chunks.front().startTime;
743 for(
auto& c :
m_d->chunks)
745 m_d->startTime = std::min(c.startTime,
m_d->startTime);
746 m_d->endTime = std::max(c.endTime,
m_d->endTime);
751 :
m_d{std::move(other.m_d)}
761 return m_d->connections;
766 return m_d->startTime;
781 if(
m_d->chunks.empty())
795 auto chunkIt = std::lower_bound(
m_d->chunks.begin(),
m_d->chunks.end(), time, [&](Chunk& chunk,
const ros::Time& time){
796 return chunk.endTime < time;
799 if(chunkIt ==
m_d->chunks.end())
802 int chunkIdx = chunkIt -
m_d->chunks.begin();
806 for(; it !=
end(); ++it)
817 if(chunk < 0 || chunk >= static_cast<int>(
m_d->chunks.size()))
825 return m_d->chunks.size();
834 char bagfileName[] =
"/tmp/rosbag_fancy_test_XXXXXX";
836 int fd = mkstemp(bagfileName);
840 rosbag::Bag bag{bagfileName, rosbag::BagMode::Write};
843 { bag.setCompression(rosbag::CompressionType::Uncompressed); }
845 { bag.setCompression(rosbag::CompressionType::BZ2); }
847 { bag.setCompression(rosbag::CompressionType::LZ4); }
850 std_msgs::Header msg;
852 bag.write(
"/topicA",
ros::Time(1000, 0), msg);
855 std_msgs::Header msg;
857 bag.write(
"/topicB",
ros::Time(1001, 0), msg);
862 bag.write(
"/topicC",
ros::Time(1002, 0), msg);
871 auto it = reader.
begin();
874 CHECK(it->connection->topicInBag ==
"/topicA");
876 ++it;
REQUIRE(it != reader.end());
877 CHECK(it->connection->topicInBag ==
"/topicB");
879 ++it;
REQUIRE(it != reader.end());
880 CHECK(it->connection->topicInBag ==
"/topicC");
882 ++it;
CHECK(it == reader.end());
decltype(sizeof(void *)) typede size_t)
const BagReader * m_reader
std::map< std::uint32_t, Connection > connections
const ConnectionMap & connections() const
std::size_t numChunks() const
Iterator findTime(const ros::Time &time) const
ros::Time endTime() const
Private(const std::string &filename)
std::unique_ptr< Private > m_d
std::vector< ConnectionInfo > & currentChunkConnections() const
BagReader(const std::string &filename)
const Connection * connection
TF2SIMD_FORCE_INLINE tf2Scalar length(const Quaternion &q)
int record(const std::vector< std::string > &options)
rosbag::CompressionType currentChunkCompression() const
void advanceWithPredicates(const std::function< bool(const ConnectionInfo &)> &connPredicate, const std::function< bool(const Message &)> &messagePredicate)
Compression m_compression
Iterator chunkBegin(int chunk) const
std::vector< Chunk > chunks
std::map< std::uint32_t, Connection > ConnectionMap
std::shared_ptr< Private > m_d
ros::Time startTime() const
void findNextWithPredicates(const std::function< bool(const ConnectionInfo &)> &connPredicate, const std::function< bool(const Message &)> &messagePredicate)
ChunkIterator & operator++()
std::string topicAsPublished
void readData(std::size_t amount)
TEST_CASE("BagView: One file")
std::vector< uint8_t > m_uncompressedBuffer
std::map< std::string, std::vector< std::uint32_t > > connectionsForTopic
int roslz4_buffToBuffDecompress(char *input, unsigned int input_size, char *output, unsigned int *output_size)