7 #include <fmt/ostream.h>
19 #include <std_msgs/Header.h>
20 #include <std_msgs/UInt8.h>
44 std::map<std::string, Span> headers;
49 T integralHeader(
const std::string& name)
51 auto it = headers.find(name);
52 if(it == headers.end())
55 if(it->second.size !=
sizeof(T))
56 throw Exception{
fmt::format(
"Header '{}' has wrong size {} (expected {})\n", name, it->second.size,
sizeof(T))};
59 std::memcpy(&ret, it->second.start,
sizeof(T));
63 std::string stringHeader(
const std::string& name,
const std::optional<std::string>& defaultValue = {})
65 auto it = headers.find(name);
66 if(it == headers.end())
74 return {
reinterpret_cast<char*
>(it->second.start), it->second.size};
88 std::vector<rosbag_fancy::BagReader::ConnectionInfo> connectionInfos;
91 std::map<std::string, Span> readHeader(uint8_t* base,
std::size_t remainingSize)
93 std::map<std::string, Span> ret;
95 while(remainingSize > 0)
101 std::memcpy(&entrySize, base, 4);
107 auto* equal =
reinterpret_cast<uint8_t*
>(std::memchr(base,
'=', entrySize));
109 throw Exception{
"Invalid header map entry"};
111 ret[std::string(
reinterpret_cast<char*
>(base), equal - base)]
112 = Span{equal+1,
static_cast<std::size_t>(entrySize - 1 - (equal - base))};
115 remainingSize -= entrySize;
121 Record readRecord(uint8_t* base,
std::size_t remainingSize)
125 if(remainingSize < 4)
128 std::memcpy(&
record.headerSize, base, 4);
132 if(remainingSize <
record.headerSize)
135 record.headerBegin = base;
137 base +=
record.headerSize;
138 remainingSize -=
record.headerSize;
140 if(remainingSize < 4)
143 std::memcpy(&
record.dataSize, base, 4);
147 if(remainingSize <
record.dataSize)
157 auto it =
record.headers.find(
"op");
158 if(it ==
record.headers.end())
159 throw Exception{
"Record without op header"};
161 if(it->second.size != 1)
164 record.op = it->second.start[0];
173 explicit MappedFile(
const std::string& file)
175 m_fd = open(file.c_str(), O_RDONLY);
180 m_size = lseek(m_fd, 0, SEEK_END);
187 lseek(m_fd, 0, SEEK_SET);
189 m_data =
reinterpret_cast<uint8_t*
>(mmap(
nullptr, m_size, PROT_READ, MAP_PRIVATE, m_fd, 0));
190 if(m_data == MAP_FAILED)
199 munmap(m_data, m_size);
216 template <>
struct fmt::formatter<Span>: formatter<string_view>
218 template <
typename FormatContext>
221 if(std::all_of(c.start, c.start + c.size, [](uint8_t c){return std::isprint(c);}))
231 fmt::print(
s,
"{:02X}", c.start[i]);
310 throw Exception{
"Compressed chunk data ends prematurely! (not enough output)"};
312 if(ret != BZ_STREAM_END && ret != BZ_OK)
319 throw std::logic_error{
"non-implemented compression mode"};
354 auto it =
record.headers.find(
"op");
355 if(it ==
record.headers.end())
356 throw Exception{
"Record without op header"};
358 if(it->second.size != 1)
361 record.op = it->second.start[0];
369 throw std::logic_error{
"non-implemented compression mode"};
385 : m_d{std::make_unique<Private>()}
387 m_d->m_reader = reader;
388 m_d->m_chunk = chunk;
390 auto& chunkData = reader->m_d->chunks[chunk];
391 auto rec = readRecord(chunkData.chunkStart, chunkData.chunkCompressedSize);
393 std::string compression = rec.stringHeader(
"compression");
394 if(compression ==
"none")
396 else if(compression ==
"bz2")
397 m_d->m_compression = Private::Compression::BZ2;
398 else if(compression ==
"lz4")
399 m_d->m_compression = Private::Compression::LZ4;
403 switch(
m_d->m_compression)
406 m_d->m_dataPtr = rec.dataBegin;
407 m_d->m_remaining = rec.dataSize;
410 case Private::Compression::BZ2:
412 int ret = BZ2_bzDecompressInit(&
m_d->m_bzStream, 0, 0);
416 m_d->m_bzStream.next_in =
reinterpret_cast<char*
>(rec.dataBegin);
417 m_d->m_bzStream.avail_in = rec.dataSize;
419 m_d->m_remaining = rec.integralHeader<std::uint32_t>(
"size");
424 case Private::Compression::LZ4:
429 m_d->m_remaining = rec.integralHeader<std::uint32_t>(
"size");
430 m_d->m_uncompressedBuffer.resize(
m_d->m_remaining);
435 reinterpret_cast<char*
>(rec.dataBegin), rec.dataSize,
436 reinterpret_cast<char*
>(
m_d->m_uncompressedBuffer.data()), &length
442 if(
m_d->m_remaining != length)
445 m_d->m_dataPtr =
m_d->m_uncompressedBuffer.data();
461 if(
m_d->m_remaining == 0)
468 auto rec =
m_d->readRecord();
472 m_msg.m_data = rec.dataBegin;
473 m_msg.m_size = rec.dataSize;
475 uint32_t connID = rec.integralHeader<uint32_t>(
"conn");
476 if(connID >
m_d->m_reader->connections().size())
479 m_msg.connection = &
m_d->m_reader->connections().at(connID);
481 uint64_t time = rec.integralHeader<uint64_t>(
"time");
482 m_msg.stamp =
ros::Time(time & 0xFFFFFFFF, time >> 32);
494 , m_it{reader, chunk}
504 if(m_chunk <
static_cast<int>(m_reader->m_d->chunks.size()))
526 if(m_chunk ==
static_cast<int>(m_reader->m_d->chunks.size()))
534 findNextWithPredicates(connPredicate, messagePredicate);
539 auto currentChunkIsInteresting = [&](){
540 auto& chunkData = m_reader->m_d->chunks[m_chunk];
541 return std::any_of(chunkData.connectionInfos.begin(), chunkData.connectionInfos.end(), connPredicate);
548 if(m_chunk == -1 || m_chunk ==
static_cast<int>(m_reader->m_d->chunks.size()))
555 while(!currentChunkIsInteresting())
558 if(m_chunk >=
static_cast<int>(m_reader->m_d->chunks.size()))
573 if(messagePredicate(*m_it))
586 return m_reader->m_d->chunks[m_chunk].connectionInfos;
591 switch(m_it.m_d->m_compression)
601 throw std::logic_error{
"unknown compression"};
606 :
m_d{std::make_unique<Private>(filename)}
608 m_d->data =
reinterpret_cast<uint8_t*
>(m_d->file.data());
609 m_d->size = m_d->file.size();
612 std::string versionLine;
613 uint8_t* bagHeaderBegin{};
615 constexpr
int MAX_VERSION_LENGTH = 20;
616 auto* newline =
reinterpret_cast<uint8_t*
>(std::memchr(
617 m_d->data,
'\n', std::min<off_t>(m_d->size, MAX_VERSION_LENGTH))
622 versionLine = std::string(
reinterpret_cast<char*
>(m_d->data), newline - m_d->data);
623 bagHeaderBegin = newline + 1;
626 if(versionLine !=
"#ROSBAG V2.0")
627 throw Exception{
"I can't read this file (I can only read ROSBAG V2.0"};
629 Record bagHeader = readRecord(bagHeaderBegin, m_d->size - (bagHeaderBegin - m_d->data));
631 if(bagHeader.op != 0x03)
632 throw Exception{
"First record is not a bag header\n"};
634 std::uint64_t indexPos = bagHeader.integralHeader<std::uint64_t>(
"index_pos");
635 std::uint32_t connectionCount = bagHeader.integralHeader<std::uint32_t>(
"conn_count");
636 std::uint32_t chunkCount = bagHeader.integralHeader<std::uint32_t>(
"chunk_count");
638 if(indexPos >=
static_cast<std::uint64_t
>(m_d->size))
639 throw Exception{
fmt::format(
"Index position is too large: {} vs data size {}", indexPos, m_d->size)};
642 uint8_t* rptr = m_d->data + indexPos;
648 Record rec = readRecord(rptr, remaining);
650 throw Exception{
"Expected connection record"};
653 recData.headers = readHeader(rec.dataBegin, rec.dataSize);
656 con.id = rec.integralHeader<uint32_t>(
"conn");
657 con.topicInBag = rec.stringHeader(
"topic");
658 con.topicAsPublished = recData.stringHeader(
"topic", std::string{});
659 con.type = recData.stringHeader(
"type");
660 con.md5sum = recData.stringHeader(
"md5sum");
661 con.msgDef = recData.stringHeader(
"message_definition");
664 auto it = recData.headers.find(
"callerid");
665 if(it != recData.headers.end())
666 con.callerID = recData.stringHeader(
"callerid");
669 auto it = recData.headers.find(
"latching");
670 if(it != recData.headers.end())
672 if(it->second.size != 1)
673 throw Exception{
"Invalid latching header"};
675 if(it->second.start[0] ==
'1')
680 m_d->connections[con.id] = con;
681 m_d->connectionsForTopic[con.topicInBag].push_back(con.id);
684 remaining = m_d->size - (rptr - m_d->data);
688 m_d->chunks.reserve(chunkCount);
689 Chunk* lastChunk = {};
692 Record rec = readRecord(rptr, remaining);
694 throw Exception{
"Expected chunk info record"};
696 auto& chunk = m_d->chunks.emplace_back();
698 auto version = rec.integralHeader<std::uint32_t>(
"ver");
702 auto chunkPos = rec.integralHeader<std::uint64_t>(
"chunk_pos");
703 if(chunkPos >=
static_cast<std::uint64_t
>(m_d->size))
704 throw Exception{
"chunk_pos points outside of valid range"};
706 chunk.chunkStart = m_d->data + chunkPos;
708 lastChunk->chunkCompressedSize = chunk.chunkStart - lastChunk->chunkStart;
710 std::uint64_t startTime = rec.integralHeader<std::uint64_t>(
"start_time");
711 std::uint64_t endTime = rec.integralHeader<std::uint64_t>(
"end_time");
713 chunk.startTime =
ros::Time(startTime & 0xFFFFFFFF, startTime >> 32);
714 chunk.endTime =
ros::Time(endTime & 0xFFFFFFFF, endTime >> 32);
716 auto numConnections = rec.integralHeader<std::uint32_t>(
"count");
718 if(rec.dataSize < numConnections *
sizeof(ConnectionInfo))
719 throw Exception{
"Chunk info is too small"};
721 chunk.connectionInfos.resize(numConnections);
722 std::memcpy(chunk.connectionInfos.data(), rec.dataBegin, numConnections *
sizeof(ConnectionInfo));
724 for(
auto& connInfo : chunk.connectionInfos)
726 auto it = m_d->connections.find(connInfo.id);
727 if(it == m_d->connections.end())
730 it->second.totalCount += connInfo.msgCount;
734 remaining = m_d->size - (rptr - m_d->data);
738 lastChunk->chunkCompressedSize = (m_d->data + m_d->size) - lastChunk->chunkStart;
740 if(!m_d->chunks.empty())
741 m_d->startTime = m_d->chunks.front().startTime;
743 for(
auto& c : m_d->chunks)
745 m_d->startTime = std::min(c.startTime, m_d->startTime);
746 m_d->endTime = std::max(c.endTime, m_d->endTime);
751 : m_d{std::move(other.m_d)}
761 return m_d->connections;
766 return m_d->startTime;
781 if(
m_d->chunks.empty())
795 auto chunkIt = std::lower_bound(
m_d->chunks.begin(),
m_d->chunks.end(), time, [&](Chunk& chunk,
const ros::Time& time){
796 return chunk.endTime < time;
799 if(chunkIt ==
m_d->chunks.end())
802 int chunkIdx = chunkIt -
m_d->chunks.begin();
806 for(; it !=
end(); ++it)
817 if(chunk < 0 || chunk >=
static_cast<int>(
m_d->chunks.size()))
825 return m_d->chunks.size();
834 char bagfileName[] =
"/tmp/rosbag_fancy_test_XXXXXX";
836 int fd = mkstemp(bagfileName);
840 rosbag::Bag bag{bagfileName, rosbag::BagMode::Write};
843 { bag.setCompression(rosbag::CompressionType::Uncompressed); }
845 { bag.setCompression(rosbag::CompressionType::BZ2); }
847 { bag.setCompression(rosbag::CompressionType::LZ4); }
850 std_msgs::Header msg;
852 bag.write(
"/topicA",
ros::Time(1000, 0), msg);
855 std_msgs::Header msg;
857 bag.write(
"/topicB",
ros::Time(1001, 0), msg);
862 bag.write(
"/topicC",
ros::Time(1002, 0), msg);
871 auto it = reader.
begin();
874 CHECK(it->connection->topicInBag ==
"/topicA");
876 ++it;
REQUIRE(it != reader.end());
877 CHECK(it->connection->topicInBag ==
"/topicB");
879 ++it;
REQUIRE(it != reader.end());
880 CHECK(it->connection->topicInBag ==
"/topicC");
882 ++it;
CHECK(it == reader.end());