42 #include <boost/foreach.hpp> 44 #include "console_bridge/console.h" 49 #define foreach BOOST_FOREACH 52 using std::priority_queue;
64 mode_(bagmode::
Write),
67 chunk_threshold_(768 * 1024),
75 curr_chunk_data_pos_(0),
77 decompressed_chunk_(0)
81 Bag::Bag(
string const& filename, uint32_t mode) :
112 throw BagException((format(
"Unknown mode: %1%") % (
int) mode).str());
116 seek(0, std::ios::end);
159 seek(0, std::ios::end);
208 (format(
"Unknown compression type: %i") % compression).str());
217 string version = string(
"#ROSBAG V") +
VERSION + string(
"\n");
231 char logtypename[100];
232 int version_major, version_minor;
233 #if defined(_MSC_VER) 234 if (sscanf_s(version_line.c_str(),
"#ROS%s V%d.%d", logtypename,
sizeof(logtypename), &version_major, &version_minor) != 3)
236 if (sscanf(version_line.c_str(),
"#ROS%99s V%d.%d", logtypename, &version_major, &version_minor) != 3)
240 version_ = version_major * 100 + version_minor;
260 seek(0, std::ios::end);
316 seek(0, std::ios::end);
328 multiset<IndexEntry>
const& index = i->second;
329 IndexEntry const& first_entry = *index.begin();
355 boost::shared_array<uint8_t> header_buffer;
358 uint32_t data_len = 0;
361 write((
char*) &header_len, 4);
362 write((
char*) header_buffer.get(), header_len);
363 write((
char*) &data_len, 4);
368 padding.resize(data_len,
' ');
400 seek(data_size, std::ios::cur);
444 seek(end_of_chunk_pos);
457 switch (compression) {
498 uint32_t connection_id = i->first;
499 multiset<IndexEntry>
const& index = i->second;
502 uint32_t index_size = index.size();
535 uint32_t index_version;
544 if (index_version != 0)
545 throw BagFormatException((format(
"Unsupported INDEX_DATA version: %1%") % index_version).str());
547 uint32_t connection_id;
554 connection_info->id = connection_id;
555 connection_info->topic = topic;
561 connection_id = topic_conn_id_iter->second;
565 for (uint32_t i = 0; i < count; i++) {
569 read((
char*) &sec, 4);
570 read((
char*) &nsec, 4);
582 connection_index.insert(connection_index.end(), index_entry);
597 uint32_t index_version;
598 uint32_t connection_id;
604 CONSOLE_BRIDGE_logDebug(
"Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
606 if (index_version != 1)
607 throw BagFormatException((format(
"Unsupported INDEX_DATA version: %1%") % index_version).str());
613 for (uint32_t i = 0; i < count; i++) {
618 read((
char*) &sec, 4);
619 read((
char*) &nsec, 4);
630 connection_index.insert(connection_index.end(), index_entry);
686 map<uint32_t, ConnectionInfo*>::iterator key =
connections_.find(
id);
689 connection_info->
id = id;
690 connection_info->
topic = topic;
691 connection_info->
header = boost::make_shared<M_string>();
692 for (M_string::const_iterator i = connection_header.
getValues()->begin(); i != connection_header.
getValues()->end(); i++)
693 (*connection_info->
header)[i->first] = i->second;
694 connection_info->
msg_def = (*connection_info->
header)[
"message_definition"];
696 connection_info->
md5sum = (*connection_info->
header)[
"md5sum"];
725 CONSOLE_BRIDGE_logDebug(
"Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
727 connection_info->id = id;
728 connection_info->topic = topic;
734 connection_info =
connections_[topic_conn_id_iter->second];
736 connection_info->
msg_def = message_definition;
737 connection_info->
datatype = datatype;
738 connection_info->
md5sum = md5sum;
739 connection_info->
header = boost::make_shared<ros::M_string>();
741 (*connection_info->
header)[
"md5sum"] = connection_info->
md5sum;
742 (*connection_info->
header)[
"message_definition"] = connection_info->
msg_def;
744 CONSOLE_BRIDGE_logDebug(
"Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
906 uint32_t connection_id = i->first;
907 uint32_t count = i->second;
909 write((
char*) &connection_id, 4);
910 write((
char*) &count, 4);
928 uint32_t chunk_info_version;
938 uint32_t chunk_connection_count = 0;
942 (
unsigned long long) chunk_info.
pos, chunk_connection_count,
947 for (uint32_t i = 0; i < chunk_connection_count; i ++) {
948 uint32_t connection_id, connection_count;
949 read((
char*) &connection_id, 4);
950 read((
char*) &connection_count, 4);
969 boost::shared_array<uint8_t> header_buffer;
972 write((
char*) &header_len, 4);
973 write((
char*) header_buffer.get(), header_len);
977 write((
char*) &data_len, 4);
981 boost::shared_array<uint8_t> header_buffer;
985 uint32_t offset = buf.
getSize();
989 memcpy(buf.
getData() + offset, &header_len, 4);
991 memcpy(buf.
getData() + offset, header_buffer.get(), header_len);
995 uint32_t offset = buf.
getSize();
999 memcpy(buf.
getData() + offset, &data_len, 4);
1006 uint8_t* start = (uint8_t*) buffer.
getData() + offset;
1008 uint8_t* ptr = start;
1011 uint32_t header_len;
1012 memcpy(&header_len, ptr, 4);
1017 bool parsed = header.
parse(ptr, header_len, error_msg);
1023 memcpy(&data_size, ptr, 4);
1026 bytes_read = ptr - start;
1031 total_bytes_read = 0;
1035 uint32_t bytes_read;
1038 offset += bytes_read;
1039 total_bytes_read += bytes_read;
1051 uint32_t header_len;
1052 read((
char*) &header_len, 4);
1068 read((
char*) &data_size, 4);
1072 M_string::const_iterator
Bag::checkField(
M_string const& fields,
string const& field,
unsigned int min_len,
unsigned int max_len,
bool required)
const {
1073 M_string::const_iterator fitr = fields.find(field);
1074 if (fitr == fields.end()) {
1078 else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
1079 throw BagFormatException((format(
"Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
1084 bool Bag::readField(
M_string const& fields,
string const& field_name,
bool required,
string& data)
const {
1085 return readField(fields, field_name, 1, UINT_MAX, required, data);
1088 bool Bag::readField(
M_string const& fields,
string const& field_name,
unsigned int min_len,
unsigned int max_len,
bool required,
string& data)
const {
1089 M_string::const_iterator fitr =
checkField(fields, field_name, min_len, max_len, required);
1090 if (fitr == fields.end())
1093 data = fitr->second;
1098 uint64_t packed_time;
1099 if (!
readField(fields, field_name, required, &packed_time))
1102 uint64_t bitmask = (1LL << 33) - 1;
1103 data.
sec = (uint32_t) (packed_time & bitmask);
1104 data.
nsec = (uint32_t) (packed_time >> 32);
1110 uint64_t packed_time = (((uint64_t) field->
nsec) << 32) + field->
sec;
BagMode getMode() const
Get the mode the bag is in.
uint32_t readMessageDataSize(IndexEntry const &index_entry) const
static const std::string COMPRESSION_BZ2
void openRead(std::string const &filename)
open file for reading
static const std::string END_TIME_FIELD_NAME
void startWritingChunk(ros::Time time)
#define CONSOLE_BRIDGE_logDebug(fmt,...)
static const std::string CHUNK_COUNT_FIELD_NAME
void read(void *ptr, size_t size)
read size bytes from the file into ptr
static const std::string COMPRESSION_FIELD_NAME
std::map< std::string, uint32_t > topic_connection_ids_
void decompressChunk(uint64_t chunk_pos) const
uint64_t pos
absolute byte offset of chunk record in bag file
void openRead(std::string const &filename)
static const std::string VER_FIELD_NAME
ros::M_string::const_iterator checkField(ros::M_string const &fields, std::string const &field, unsigned int min_len, unsigned int max_len, bool required) const
static const std::string COMPRESSION_LZ4
void open(std::string const &filename, uint32_t mode=bagmode::Read)
Open a bag file.
uint32_t getChunkThreshold() const
Get the threshold for creating new chunks.
std::string getFileName() const
Get the filename of the bag.
std::map< uint32_t, ConnectionInfo * > connections_
static const std::string CONNECTION_FIELD_NAME
void startReadingVersion102()
#define CONSOLE_BRIDGE_logError(fmt,...)
void write(std::string const &s)
std::map< ros::M_string, uint32_t > header_connection_ids_
static const std::string INDEX_POS_FIELD_NAME
void setChunkThreshold(uint32_t chunk_threshold)
Set the threshold for creating new chunks.
ros::Time time
timestamp of the message
void readTopicIndexRecord102()
uint64_t getOffset() const
return current offset from the beginning of the file
std::map< uint32_t, uint32_t > connection_counts
number of messages in each connection stored in the chunk
void readMessageDataHeaderFromBuffer(Buffer &buffer, uint32_t offset, ros::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
CompressionType getCompression() const
Get the compression method to use for writing chunks.
Buffer header_buffer_
reusable buffer in which to assemble the record header before writing to file
static const unsigned char OP_CONNECTION
void openWrite(std::string const &filename)
void seek(uint64_t pos, int origin=std::ios_base::beg) const
Base class for rosbag exceptions.
bool truncate(uint64_t length)
void seek(uint64_t offset, int origin=std::ios_base::beg)
seek to given offset from origin
void writeConnectionRecords()
uint32_t getChunkOffset() const
static const std::string DEF_FIELD_NAME
uint32_t getMajorVersion() const
Get the major-version of the open bag file.
void readChunkHeader(ChunkHeader &chunk_header) const
void writeConnectionRecord(ConnectionInfo const *connection_info)
std_msgs::Header * header(M &m)
void openAppend(std::string const &filename)
void close()
close the file
void swap(Bag &a, Bag &b)
void appendDataLengthToBuffer(Buffer &buf, uint32_t data_len)
uint32_t getCompressedBytesIn() const
return the number of bytes written to current compressed stream
BagMode
The possible modes to open a bag in.
void setWriteMode(CompressionType type)
static const unsigned char OP_MSG_DATA
void close()
Close the bag file.
static const unsigned char OP_CHUNK
bool readField(ros::M_string const &fields, std::string const &field_name, bool required, T *data) const
static const std::string VERSION
Buffer decompress_buffer_
reusable buffer to decompress chunks into
void appendConnectionRecordToBuffer(Buffer &buf, ConnectionInfo const *connection_info)
void decompressRawChunk(ChunkHeader const &chunk_header) const
std::map< std::string, std::string > M_string
void readMessageDefinitionRecord102()
void writeFileHeaderRecord()
std::string toHeaderString(T const *field) const
static const std::string CHUNK_POS_FIELD_NAME
uint64_t decompressed_chunk_
position of decompressed chunk
bool readHeader(ros::Header &header) const
void readHeaderFromBuffer(Buffer &buffer, uint32_t offset, ros::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
void writeHeader(ros::M_string const &fields)
static const std::string START_TIME_FIELD_NAME
Exception thrown on problems reading the bag index.
static const uint32_t CHUNK_INFO_VERSION
void decompressBz2Chunk(ChunkHeader const &chunk_header) const
Exception thrown when on IO problems.
void appendHeaderToBuffer(Buffer &buf, ros::M_string const &fields)
bool isOp(ros::M_string &fields, uint8_t reqOp) const
void setCompression(CompressionType compression)
Set the compression method to use for writing chunks.
void decompressLz4Chunk(ChunkHeader const &chunk_header) const
void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size)
static const std::string OP_FIELD_NAME
uint64_t file_header_pos_
bool isOpen() const
return true if file is open for reading or writing
Buffer record_buffer_
reusable buffer in which to assemble the record data before writing to file
static const std::string TYPE_FIELD_NAME
ROSTIME_DECL const Time TIME_MAX
Buffer chunk_buffer_
reusable buffer to read chunk into
ChunkInfo curr_chunk_info_
uint32_t getMinorVersion() const
Get the minor-version of the open bag file.
static const std::string COUNT_FIELD_NAME
std::map< uint32_t, std::multiset< IndexEntry > > connection_indexes_
uint64_t chunk_pos
absolute byte offset of the chunk record containing the message
static const unsigned char OP_FILE_HEADER
static const unsigned char OP_INDEX_DATA
uint32_t connection_count_
void readConnectionIndexRecord200()
void readChunkInfoRecord()
void openWrite(std::string const &filename)
open file for writing
uint32_t offset
relative byte offset of the message record (either definition or data) in the chunk ...
bool readDataLength(uint32_t &data_size) const
uint32_t chunk_threshold_
std::vector< ChunkInfo > chunks_
ROSTIME_DECL const Time TIME_MIN
boost::shared_ptr< ros::M_string > header
void startReadingVersion200()
void writeDataLength(uint32_t data_len)
CompressionType compression_
void decompress(CompressionType compression, uint8_t *dest, unsigned int dest_len, uint8_t *source, unsigned int source_len)
std::map< uint32_t, std::multiset< IndexEntry > > curr_chunk_connection_indexes_
void read(char *b, std::streamsize n) const
static const std::string CONNECTION_COUNT_FIELD_NAME
static const uint32_t FILE_HEADER_LENGTH
uint64_t curr_chunk_data_pos_
static const std::string MD5_FIELD_NAME
Buffer outgoing_chunk_buffer_
reusable buffer to read chunk into
static const unsigned char OP_CHUNK_INFO
void readMessageDataRecord102(uint64_t offset, ros::Header &header) const
static const std::string SIZE_FIELD_NAME
void readConnectionRecord()
static const std::string COMPRESSION_NONE
ros::Time end_time
latest timestamp of a message in the chunk
uint64_t getSize() const
Get the current size of the bag file (a lower bound)
void writeChunkInfoRecords()
std::string getFileName() const
return path of currently open file
static const std::string TOPIC_FIELD_NAME
void openReadWrite(std::string const &filename)
open file for reading & writing
static const unsigned char OP_MSG_DEF
void setSize(uint32_t size)
void write(std::string const &topic, ros::MessageEvent< T > const &event)
Write a message into the bag file.
void readFileHeaderRecord()
static const uint32_t INDEX_VERSION
ros::Header readMessageDataHeader(IndexEntry const &index_entry)
ros::Time start_time
earliest timestamp of a message in the chunk