Go to the documentation of this file.
   42 #include <boost/bind/bind.hpp> 
   44 #include "console_bridge/console.h" 
   47 using std::priority_queue;
 
   58 Bag::Bag() : encryptor_loader_(
"rosbag_storage", 
"rosbag::EncryptorBase")
 
   63 Bag::Bag(
string const& filename, uint32_t mode) : encryptor_loader_(
"rosbag_storage", 
"rosbag::EncryptorBase")
 
   69 #ifndef BOOST_NO_CXX11_RVALUE_REFERENCES 
   71 Bag::Bag(
Bag&& other) : encryptor_loader_(
"rosbag_storage", 
"rosbag::EncryptorBase") {
 
   81 #endif // BOOST_NO_CXX11_RVALUE_REFERENCES 
  115         throw BagException((format(
"Unknown mode: %1%") % (
int) mode).str());
 
  119     seek(0, std::ios::end);
 
  162     seek(0, std::ios::end);
 
  213             (format(
"Unknown compression type: %i")  % compression).str());
 
  221         throw BagException(
"Cannot set encryption plugin after chunks are written");
 
  230     string version = string(
"#ROSBAG V") + 
VERSION + string(
"\n");
 
  232     CONSOLE_BRIDGE_logDebug(
"Writing VERSION [%llu]: %s", (
unsigned long long) 
file_.
getOffset(), version.c_str());
 
  244     char logtypename[100];
 
  245     int version_major, version_minor;
 
  246 #if defined(_MSC_VER) 
  247     if (sscanf_s(version_line.c_str(), 
"#ROS%s V%d.%d", logtypename, 
sizeof(logtypename), &version_major, &version_minor) != 3)
 
  249     if (sscanf(version_line.c_str(), 
"#ROS%99s V%d.%d", logtypename, &version_major, &version_minor) != 3)
 
  253     version_ = version_major * 100 + version_minor;
 
  255     CONSOLE_BRIDGE_logDebug(
"Read VERSION: version=%d", 
version_);
 
  273     seek(0, std::ios::end);
 
  310         for (
unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
 
  329     seek(0, std::ios::end);
 
  341         multiset<IndexEntry> 
const& index       = i->second;
 
  342         IndexEntry const&           first_entry = *index.begin();
 
  344         CONSOLE_BRIDGE_logDebug(
"Reading message definition for connection %d at %llu", i->first, (
unsigned long long) first_entry.
chunk_pos);
 
  358     CONSOLE_BRIDGE_logDebug(
"Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
 
  372     uint32_t data_len = 0;
 
  375     write((
char*) &header_len, 4);
 
  376     write((
char*) header_buffer.get(), header_len);
 
  377     write((
char*) &data_len, 4);
 
  382         padding.resize(data_len, 
' ');
 
  408         std::string encryptor_plugin_name;
 
  410         if (!encryptor_plugin_name.empty()) {
 
  416     CONSOLE_BRIDGE_logDebug(
"Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
 
  420     seek(data_size, std::ios::cur);
 
  468     seek(end_of_chunk_pos);
 
  481     switch (compression) {
 
  490     CONSOLE_BRIDGE_logDebug(
"Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
 
  522         uint32_t                    connection_id = i->first;
 
  523         multiset<IndexEntry> 
const& index         = i->second;
 
  526         uint32_t index_size = index.size();
 
  536         CONSOLE_BRIDGE_logDebug(
"Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, 
INDEX_VERSION, index_size);
 
  540             write((
char*) &e.time.sec,  4);
 
  541             write((
char*) &e.time.nsec, 4);
 
  542             write((
char*) &e.offset,    4);
 
  544             CONSOLE_BRIDGE_logDebug(
"  - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
 
  559     uint32_t index_version;
 
  566     CONSOLE_BRIDGE_logDebug(
"Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
 
  568     if (index_version != 0)
 
  569         throw BagFormatException((format(
"Unsupported INDEX_DATA version: %1%") % index_version).str());
 
  571     uint32_t connection_id;
 
  576         CONSOLE_BRIDGE_logDebug(
"Creating connection: id=%d topic=%s", connection_id, topic.c_str());
 
  578         connection_info->
id       = connection_id;
 
  579         connection_info->
topic    = topic;
 
  585         connection_id = topic_conn_id_iter->second;
 
  589     for (uint32_t i = 0; i < count; i++) {
 
  593         read((
char*) &sec,                   4);
 
  594         read((
char*) &nsec,                  4);
 
  599         CONSOLE_BRIDGE_logDebug(
"  - %d.%d: %llu", sec, nsec, (
unsigned long long) index_entry.
chunk_pos);
 
  606           connection_index.insert(connection_index.end(), index_entry);
 
  621     uint32_t index_version;
 
  622     uint32_t connection_id;
 
  628     CONSOLE_BRIDGE_logDebug(
"Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
 
  630     if (index_version != 1)
 
  631         throw BagFormatException((format(
"Unsupported INDEX_DATA version: %1%") % index_version).str());
 
  637     for (uint32_t i = 0; i < count; i++) {
 
  642         read((
char*) &sec,                4);
 
  643         read((
char*) &nsec,               4);
 
  647         CONSOLE_BRIDGE_logDebug(
"  - %d.%d: %llu+%d", sec, nsec, (
unsigned long long) index_entry.
chunk_pos, index_entry.
offset);
 
  654           connection_index.insert(connection_index.end(), index_entry);
 
  669     CONSOLE_BRIDGE_logDebug(
"Writing CONNECTION [%llu:%d]: topic=%s id=%d",
 
  717     map<uint32_t, ConnectionInfo*>::iterator key = 
connections_.find(
id);
 
  720         connection_info->
id       = id;
 
  721         connection_info->
topic    = topic;
 
  722         connection_info->
header = boost::make_shared<M_string>();
 
  723         for (M_string::const_iterator i = connection_header.
getValues()->begin(); i != connection_header.
getValues()->end(); i++)
 
  724             (*connection_info->
header)[i->first] = i->second;
 
  725         connection_info->
msg_def  = (*connection_info->
header)[
"message_definition"];
 
  727         connection_info->
md5sum   = (*connection_info->
header)[
"md5sum"];
 
  730         CONSOLE_BRIDGE_logDebug(
"Read CONNECTION: topic=%s id=%d", topic.c_str(), 
id);
 
  756         CONSOLE_BRIDGE_logDebug(
"Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), 
md5sum.c_str(), 
datatype.c_str());
 
  758         connection_info->
id       = id;
 
  759         connection_info->
topic    = topic;
 
  765         connection_info = 
connections_[topic_conn_id_iter->second];
 
  767     connection_info->
msg_def  = message_definition;
 
  770     connection_info->
header = boost::make_shared<ros::M_string>();
 
  772     (*connection_info->
header)[
"md5sum"]             = connection_info->
md5sum;
 
  773     (*connection_info->
header)[
"message_definition"] = connection_info->
msg_def;
 
  775     CONSOLE_BRIDGE_logDebug(
"Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), 
md5sum.c_str(), 
datatype.c_str());
 
  810     CONSOLE_BRIDGE_logDebug(
"readMessageDataRecord: offset=%llu", (
unsigned long long) offset);
 
  862     CONSOLE_BRIDGE_logDebug(
"lz4 compressed_size: %d uncompressed_size: %d",
 
  914         uint32_t chunk_connection_count = chunk_info.connection_counts.size();
 
  922         CONSOLE_BRIDGE_logDebug(
"Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
 
  924                   chunk_info.start_time.sec, chunk_info.start_time.nsec,
 
  925                   chunk_info.end_time.sec, chunk_info.end_time.nsec);
 
  932         for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
 
  933             uint32_t connection_id = i->first;
 
  934             uint32_t count         = i->second;
 
  936             write((
char*) &connection_id, 4);
 
  937             write((
char*) &count, 4);
 
  939             CONSOLE_BRIDGE_logDebug(
"  - %d: %d", connection_id, count);
 
  955     uint32_t chunk_info_version;
 
  965     uint32_t chunk_connection_count = 0;
 
  968     CONSOLE_BRIDGE_logDebug(
"Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
 
  969               (
unsigned long long) chunk_info.
pos, chunk_connection_count,
 
  974     for (uint32_t i = 0; i < chunk_connection_count; i ++) {
 
  975         uint32_t connection_id, connection_count;
 
  976         read((
char*) &connection_id,    4);
 
  977         read((
char*) &connection_count, 4);
 
  979         CONSOLE_BRIDGE_logDebug(
"  %d: %d messages", connection_id, connection_count);
 
  999     write((
char*) &header_len, 4);
 
 1000     write((
char*) header_buffer.get(), header_len);
 
 1004     write((
char*) &data_len, 4);
 
 1009     uint32_t header_len;
 
 1012     uint32_t offset = buf.
getSize();
 
 1016     memcpy(buf.
getData() + offset, &header_len, 4);
 
 1018     memcpy(buf.
getData() + offset, header_buffer.get(), header_len);
 
 1022     uint32_t offset = buf.
getSize();
 
 1026     memcpy(buf.
getData() + offset, &data_len, 4);
 
 1035     uint8_t* ptr = 
start;
 
 1038     uint32_t header_len;
 
 1039     memcpy(&header_len, ptr, 4);
 
 1044     bool parsed = 
header.parse(ptr, header_len, error_msg);
 
 1050     memcpy(&data_size, ptr, 4);
 
 1053     bytes_read = ptr - 
start;
 
 1058     total_bytes_read = 0;
 
 1061         CONSOLE_BRIDGE_logDebug(
"reading header from buffer: offset=%d", offset);
 
 1062         uint32_t bytes_read;
 
 1065         offset += bytes_read;
 
 1066         total_bytes_read += bytes_read;
 
 1078     uint32_t header_len;
 
 1079     read((
char*) &header_len, 4);
 
 1095     read((
char*) &data_size, 4);
 
 1099 M_string::const_iterator 
Bag::checkField(
M_string const& fields, 
string const& field, 
unsigned int min_len, 
unsigned int max_len, 
bool required)
 const {
 
 1100     M_string::const_iterator fitr = fields.find(field);
 
 1101     if (fitr == fields.end()) {
 
 1105     else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
 
 1106         throw BagFormatException((format(
"Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
 
 1111 bool Bag::readField(
M_string const& fields, 
string const& field_name, 
bool required, 
string& data)
 const {
 
 1112     return readField(fields, field_name, 1, UINT_MAX, required, data);
 
 1115 bool Bag::readField(
M_string const& fields, 
string const& field_name, 
unsigned int min_len, 
unsigned int max_len, 
bool required, 
string& data)
 const {
 
 1116     M_string::const_iterator fitr = 
checkField(fields, field_name, min_len, max_len, required);
 
 1117     if (fitr == fields.end())
 
 1120     data = fitr->second;
 
 1125     uint64_t packed_time;
 
 1126     if (!
readField(fields, field_name, required, &packed_time))
 
 1129     uint64_t bitmask = (1LL << 33) - 1;
 
 1130     data.
sec  = (uint32_t) (packed_time & bitmask);
 
 1131     data.
nsec = (uint32_t) (packed_time >> 32);
 
 1137     uint64_t packed_time = (((uint64_t) field->
nsec) << 32) + field->
sec;
 
  
std::vector< ChunkInfo > chunks_
std::string getFileName() const
return path of currently open file
BagMode getMode() const
Get the mode the bag is in.
void setWriteMode(CompressionType type)
std::map< std::string, uint32_t > topic_connection_ids_
static const std::string END_TIME_FIELD_NAME
CompressionType getCompression() const
Get the compression method to use for writing chunks.
uint32_t connection_count_
void read(char *b, std::streamsize n) const
void startWritingChunk(ros::Time time)
void writeHeader(ros::M_string const &fields)
static const std::string COMPRESSION_NONE
static const unsigned char OP_CHUNK_INFO
uint32_t readMessageDataSize(IndexEntry const &index_entry) const
uint64_t decompressed_chunk_
position of decompressed chunk
void decompressBz2Chunk(ChunkHeader const &chunk_header) const
uint32_t getMajorVersion() const
Get the major-version of the open bag file.
void seek(uint64_t offset, int origin=std::ios_base::beg)
seek to given offset from origin
void setSize(uint32_t size)
static const uint32_t FILE_HEADER_LENGTH
bool readDataLength(uint32_t &data_size) const
Buffer header_buffer_
reusable buffer in which to assemble the record header before writing to file
void openReadWrite(std::string const &filename)
open file for reading & writing
boost::shared_ptr< rosbag::EncryptorBase > encryptor_
void setEncryptorPlugin(const std::string &plugin_name, const std::string &plugin_param=std::string())
Set encryptor of the bag file.
static const unsigned char OP_MSG_DATA
ChunkInfo curr_chunk_info_
static const uint32_t INDEX_VERSION
void decompressChunk(uint64_t chunk_pos) const
boost::shared_ptr< ros::M_string > header
std::string toHeaderString(T const *field) const
bool isOp(ros::M_string &fields, uint8_t reqOp) const
static const std::string ENCRYPTOR_FIELD_NAME
static const uint32_t CHUNK_INFO_VERSION
void seek(uint64_t pos, int origin=std::ios_base::beg) const
uint32_t getChunkThreshold() const
Get the threshold for creating new chunks.
ros::Time start_time
earliest timestamp of a message in the chunk
void swap(Bag &a, Bag &b)
void close()
Close the bag file.
void writeConnectionRecord(ConnectionInfo const *connection_info, const bool encrypt)
void writeChunkInfoRecords()
std::map< uint32_t, ConnectionInfo * > connections_
void readChunkInfoRecord()
void decompressLz4Chunk(ChunkHeader const &chunk_header) const
uint32_t offset
relative byte offset of the message record (either definition or data) in the chunk
BagMode
The possible modes to open a bag in.
uint64_t getOffset() const
return current offset from the beginning of the file
Base class for rosbag exceptions.
Buffer chunk_buffer_
reusable buffer to read chunk into
static const unsigned char OP_FILE_HEADER
static const unsigned char OP_CHUNK
ros::M_string::const_iterator checkField(ros::M_string const &fields, std::string const &field, unsigned int min_len, unsigned int max_len, bool required) const
void writeConnectionRecords()
void startReadingVersion200()
uint32_t getMinorVersion() const
Get the minor-version of the open bag file.
uint64_t chunk_pos
absolute byte offset of the chunk record containing the message
std::map< uint32_t, std::multiset< IndexEntry > > connection_indexes_
bool isOpen() const
return true if file is open for reading or writing
void appendConnectionRecordToBuffer(Buffer &buf, ConnectionInfo const *connection_info)
Exception thrown when on IO problems.
std::map< uint32_t, uint32_t > connection_counts
number of messages in each connection stored in the chunk
uint64_t getSize() const
Get the current size of the bag file (a lower bound)
static const std::string CONNECTION_FIELD_NAME
void appendDataLengthToBuffer(Buffer &buf, uint32_t data_len)
uint32_t getChunkOffset() const
static const std::string MD5_FIELD_NAME
static const std::string INDEX_POS_FIELD_NAME
void setCompression(CompressionType compression)
Set the compression method to use for writing chunks.
bool readField(ros::M_string const &fields, std::string const &field_name, bool required, T *data) const
uint64_t file_header_pos_
Bag & operator=(Bag &&other)
static const std::string START_TIME_FIELD_NAME
const Time TIME_MIN(0, 1)
std::string getFileName() const
Get the filename of the bag.
uint32_t getCompressedBytesIn() const
return the number of bytes written to current compressed stream
bool readHeader(ros::Header &header) const
static const std::string TOPIC_FIELD_NAME
void appendHeaderToBuffer(Buffer &buf, ros::M_string const &fields)
void close()
close the file
void openWrite(std::string const &filename)
open file for writing
std::map< ros::M_string, uint32_t > header_connection_ids_
void decompress(CompressionType compression, uint8_t *dest, unsigned int dest_len, uint8_t *source, unsigned int source_len)
static const std::string DEF_FIELD_NAME
void writeDataLength(uint32_t data_len)
void readMessageDefinitionRecord102()
void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size)
static const std::string CONNECTION_COUNT_FIELD_NAME
void readMessageDataRecord102(uint64_t offset, ros::Header &header) const
void write(std::string const &topic, ros::MessageEvent< T > const &event)
Write a message into the bag file.
std::map< uint32_t, std::multiset< IndexEntry > > curr_chunk_connection_indexes_
const ROSTIME_DECL Time TIME_MAX
void readTopicIndexRecord102()
boost::shared_ptr< T > createInstance(const std::string &lookup_name)
static const std::string VERSION
void startReadingVersion102()
void openAppend(std::string const &filename)
void openRead(std::string const &filename)
static const std::string CHUNK_POS_FIELD_NAME
static const std::string CHUNK_COUNT_FIELD_NAME
void readFileHeaderRecord()
void decompressRawChunk(ChunkHeader const &chunk_header) const
ros::Header readMessageDataHeader(IndexEntry const &index_entry)
void openRead(std::string const &filename)
open file for reading
static const std::string TYPE_FIELD_NAME
static const std::string VER_FIELD_NAME
static const std::string COMPRESSION_LZ4
Buffer outgoing_chunk_buffer_
reusable buffer to read chunk into
void readChunkHeader(ChunkHeader &chunk_header) const
ros::Time time
timestamp of the message
static const std::string COUNT_FIELD_NAME
void open(std::string const &filename, uint32_t mode=bagmode::Read)
Open a bag file.
static const unsigned char OP_MSG_DEF
void read(void *ptr, size_t size)
read size bytes from the file into ptr
void readHeaderFromBuffer(Buffer &buffer, uint32_t offset, ros::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
CompressionType compression_
void setChunkThreshold(uint32_t chunk_threshold)
Set the threshold for creating new chunks.
static const std::string OP_FIELD_NAME
void write(std::string const &s)
uint64_t curr_chunk_data_pos_
Exception thrown on problems reading the bag index.
static const std::string COMPRESSION_FIELD_NAME
static const std::string SIZE_FIELD_NAME
void readConnectionIndexRecord200()
uint64_t pos
absolute byte offset of chunk record in bag file
bool truncate(uint64_t length)
void readMessageDataHeaderFromBuffer(Buffer &buffer, uint32_t offset, ros::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
Buffer decompress_buffer_
reusable buffer to decompress chunks into
Buffer record_buffer_
reusable buffer in which to assemble the record data before writing to file
static const unsigned char OP_CONNECTION
static const std::string COMPRESSION_BZ2
ros::Time end_time
latest timestamp of a message in the chunk
uint32_t chunk_threshold_
std::map< std::string, std::string > M_string
static const unsigned char OP_INDEX_DATA
void writeFileHeaderRecord()
pluginlib::ClassLoader< rosbag::EncryptorBase > encryptor_loader_
void openWrite(std::string const &filename)
void readConnectionRecord()
rosbag_storage
Author(s): Dirk Thomas 
, Jacob Perron 
autogenerated on Tue May 20 2025 03:00:26