60 #include <boost/format.hpp> 61 #include <boost/iterator/iterator_facade.hpp> 63 #include "console_bridge/console.h" 112 void open(std::string
const& filename, uint32_t mode =
bagmode::Read);
117 std::string getFileName()
const;
118 BagMode getMode()
const;
119 uint32_t getMajorVersion()
const;
120 uint32_t getMinorVersion()
const;
121 uint64_t getSize()
const;
125 void setChunkThreshold(uint32_t chunk_threshold);
126 uint32_t getChunkThreshold()
const;
148 void write(std::string
const& topic,
ros::Time const& time, T
const& msg,
183 Bag& operator=(
const Bag&);
189 void openRead (std::string
const& filename);
190 void openWrite (std::string
const& filename);
191 void openAppend(std::string
const& filename);
201 void startReadingVersion102();
202 void startReadingVersion200();
207 void writeFileHeaderRecord();
208 void writeConnectionRecord(
ConnectionInfo const* connection_info);
211 void writeMessageDataRecord(uint32_t conn_id,
ros::Time const& time, T
const& msg);
212 void writeIndexRecords();
213 void writeConnectionRecords();
214 void writeChunkInfoRecords();
216 void writeChunkHeader(
CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size);
217 void stopWritingChunk();
222 void readFileHeaderRecord();
223 void readConnectionRecord();
224 void readChunkHeader(
ChunkHeader& chunk_header)
const;
225 void readChunkInfoRecord();
226 void readConnectionIndexRecord200();
228 void readTopicIndexRecord102();
229 void readMessageDefinitionRecord102();
230 void readMessageDataRecord102(uint64_t offset,
ros::Header& header)
const;
233 uint32_t readMessageDataSize(
IndexEntry const& index_entry)
const;
235 template<
typename Stream>
236 void readMessageDataIntoStream(
IndexEntry const& index_entry,
Stream& stream)
const;
238 void decompressChunk(uint64_t chunk_pos)
const;
239 void decompressRawChunk(
ChunkHeader const& chunk_header)
const;
240 void decompressBz2Chunk(
ChunkHeader const& chunk_header)
const;
241 void decompressLz4Chunk(
ChunkHeader const& chunk_header)
const;
242 uint32_t getChunkOffset()
const;
247 void writeDataLength(uint32_t data_len);
249 void appendDataLengthToBuffer(
Buffer& buf, uint32_t data_len);
251 void readHeaderFromBuffer(
Buffer& buffer, uint32_t offset,
ros::Header& header, uint32_t& data_size, uint32_t& bytes_read)
const;
252 void readMessageDataHeaderFromBuffer(
Buffer& buffer, uint32_t offset,
ros::Header& header, uint32_t& data_size, uint32_t& bytes_read)
const;
254 bool readDataLength(uint32_t& data_size)
const;
260 std::string toHeaderString(T
const* field)
const;
262 std::string toHeaderString(
ros::Time const* field)
const;
265 bool readField(
ros::M_string const& fields, std::string
const& field_name,
bool required, T* data)
const;
267 bool readField(
ros::M_string const& fields, std::string
const& field_name,
unsigned int min_len,
unsigned int max_len,
bool required, std::string& data)
const;
268 bool readField(
ros::M_string const& fields, std::string
const& field_name,
bool required, std::string& data)
const;
270 bool readField(
ros::M_string const& fields, std::string
const& field_name,
bool required,
ros::Time& data)
const;
272 ros::M_string::const_iterator checkField(
ros::M_string const& fields, std::string
const& field,
273 unsigned int min_len,
unsigned int max_len,
bool required)
const;
277 void write(
char const* s, std::streamsize n);
278 void write(std::string
const& s);
279 void read(
char* b, std::streamsize n)
const;
280 void seek(uint64_t pos,
int origin = std::ios_base::beg)
const;
333 doWrite(topic, event.
getReceiptTime(), *
event.getMessage(),
event.getConnectionHeaderPtr());
338 doWrite(topic, time, msg, connection_header);
343 doWrite(topic, time, *msg, connection_header);
348 doWrite(topic, time, *msg, connection_header);
353 return std::string((
char*) field,
sizeof(T));
358 ros::M_string::const_iterator i = checkField(fields, field_name,
sizeof(T),
sizeof(T), required);
359 if (i == fields.end())
361 memcpy(data, i->second.data(),
sizeof(T));
365 template<
typename Stream>
375 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.
offset, header, data_size, bytes_read);
377 memcpy(stream.advance(data_size), current_buffer_->getData() + index_entry.
offset + bytes_read, data_size);
382 readMessageDataRecord102(index_entry.
chunk_pos, header);
383 data_size = record_buffer_.getSize();
385 memcpy(stream.advance(data_size), record_buffer_.getData(), data_size);
405 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.
offset, header, data_size, bytes_read);
408 uint32_t connection_id;
411 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
412 if (connection_iter == connections_.end())
413 throw BagFormatException((boost::format(
"Unknown connection ID: %1%") % connection_id).str());
420 predes_params.connection_header = connection_info->
header;
433 readMessageDataRecord102(index_entry.
chunk_pos, header);
438 std::string topic, latching(
"0"), callerid;
443 std::map<std::string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
444 if (topic_conn_id_iter == topic_connection_ids_.end())
446 uint32_t connection_id = topic_conn_id_iter->second;
448 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
449 if (connection_iter == connections_.end())
450 throw BagFormatException((boost::format(
"Unknown connection ID: %1%") % connection_id).str());
457 for (ros::M_string::const_iterator i = connection_info->
header->begin(); i != connection_info->
header->end(); i++)
458 (*message_header)[i->first] = i->second;
459 (*message_header)[
"latching"] = latching;
460 (*message_header)[
"callerid"] = callerid;
483 throw BagException(
"Tried to insert a message with time less than ros::TIME_MIN");
491 uint32_t conn_id = 0;
492 if (!connection_header) {
495 std::map<std::string, uint32_t>::iterator topic_connection_ids_iter = topic_connection_ids_.find(topic);
496 if (topic_connection_ids_iter == topic_connection_ids_.end()) {
497 conn_id = connections_.size();
498 topic_connection_ids_[topic] = conn_id;
501 conn_id = topic_connection_ids_iter->second;
502 connection_info = connections_[conn_id];
513 connection_header_copy[
"topic"] = topic;
515 std::map<ros::M_string, uint32_t>::iterator header_connection_ids_iter = header_connection_ids_.find(connection_header_copy);
516 if (header_connection_ids_iter == header_connection_ids_.end()) {
517 conn_id = connections_.size();
518 header_connection_ids_[connection_header_copy] = conn_id;
521 conn_id = header_connection_ids_iter->second;
522 connection_info = connections_[conn_id];
528 seek(0, std::ios::end);
529 file_size_ = file_.getOffset();
533 startWritingChunk(time);
536 if (connection_info == NULL) {
538 connection_info->
id = conn_id;
539 connection_info->
topic = topic;
543 if (connection_header != NULL) {
544 connection_info->
header = connection_header;
547 connection_info->
header = boost::make_shared<ros::M_string>();
549 (*connection_info->
header)[
"md5sum"] = connection_info->
md5sum;
550 (*connection_info->
header)[
"message_definition"] = connection_info->
msg_def;
552 connections_[conn_id] = connection_info;
554 writeConnectionRecord(connection_info);
555 appendConnectionRecordToBuffer(outgoing_chunk_buffer_, connection_info);
560 index_entry.
time = time;
561 index_entry.
chunk_pos = curr_chunk_info_.pos;
562 index_entry.
offset = getChunkOffset();
564 std::multiset<IndexEntry>& chunk_connection_index = curr_chunk_connection_indexes_[connection_info->
id];
565 chunk_connection_index.insert(chunk_connection_index.end(), index_entry);
566 std::multiset<IndexEntry>& connection_index = connection_indexes_[connection_info->
id];
567 connection_index.insert(connection_index.end(), index_entry);
570 curr_chunk_info_.connection_counts[connection_info->
id]++;
573 writeMessageDataRecord(conn_id, time, msg);
576 uint32_t chunk_size = getChunkOffset();
578 if (chunk_size > chunk_threshold_) {
581 outgoing_chunk_buffer_.setSize(0);
584 curr_chunk_info_.pos = -1;
599 record_buffer_.setSize(msg_ser_len);
609 seek(0, std::ios::end);
610 file_size_ = file_.getOffset();
613 (
unsigned long long) file_.getOffset(), getChunkOffset(), conn_id, time.
sec, time.
nsec, msg_ser_len);
616 writeDataLength(msg_ser_len);
617 write((
char*) record_buffer_.getData(), msg_ser_len);
620 appendHeaderToBuffer(outgoing_chunk_buffer_, header);
621 appendDataLengthToBuffer(outgoing_chunk_buffer_, msg_ser_len);
623 uint32_t offset = outgoing_chunk_buffer_.getSize();
624 outgoing_chunk_buffer_.setSize(outgoing_chunk_buffer_.getSize() + msg_ser_len);
625 memcpy(outgoing_chunk_buffer_.getData() + offset, record_buffer_.getData(), msg_ser_len);
628 if (time > curr_chunk_info_.end_time)
629 curr_chunk_info_.end_time = time;
630 else if (time < curr_chunk_info_.start_time)
631 curr_chunk_info_.start_time = time;
static void notify(const PreDeserializeParams< M > &)
#define CONSOLE_BRIDGE_logDebug(fmt,...)
std::map< std::string, uint32_t > topic_connection_ids_
static const std::string LATCHING_FIELD_NAME
ros::Time getReceiptTime() const
ChunkedFile reads and writes files which contain interleaved chunks of compressed and uncompressed da...
std::map< uint32_t, ConnectionInfo * > connections_
static const std::string CONNECTION_FIELD_NAME
std::map< ros::M_string, uint32_t > header_connection_ids_
ros::Time time
timestamp of the message
int writeHeader(roslz4_stream *str)
Buffer header_buffer_
reusable buffer in which to assemble the record header before writing to file
Base class for rosbag exceptions.
std_msgs::Header * header(M &m)
void swap(Bag &a, Bag &b)
BagMode
The possible modes to open a bag in.
static const unsigned char OP_MSG_DATA
boost::shared_ptr< T > instantiateBuffer(IndexEntry const &index_entry) const
deserializes the message held in record_buffer_
bool readField(ros::M_string const &fields, std::string const &field_name, bool required, T *data) const
Buffer decompress_buffer_
reusable buffer to decompress chunks into
std::map< std::string, std::string > M_string
std::string toHeaderString(T const *field) const
uint64_t decompressed_chunk_
position of decompressed chunk
A class pointing into a bag file.
void serialize(Stream &stream, const T &t)
void readMessageDataIntoStream(IndexEntry const &index_entry, Stream &stream) const
void writeMessageDataRecord(uint32_t conn_id, ros::Time const &time, T const &msg)
boost::shared_ptr< M > message
static const std::string OP_FIELD_NAME
uint64_t file_header_pos_
Buffer record_buffer_
reusable buffer in which to assemble the record data before writing to file
Buffer chunk_buffer_
reusable buffer to read chunk into
ChunkInfo curr_chunk_info_
std::map< uint32_t, std::multiset< IndexEntry > > connection_indexes_
uint64_t chunk_pos
absolute byte offset of the chunk record containing the message
uint32_t connection_count_
uint32_t offset
relative byte offset of the message record (either definition or data) in the chunk ...
uint32_t chunk_threshold_
std::vector< ChunkInfo > chunks_
ROSTIME_DECL const Time TIME_MIN
boost::shared_ptr< ros::M_string > header
uint32_t serializationLength(const T &t)
static const std::string TIME_FIELD_NAME
CompressionType compression_
std::map< uint32_t, std::multiset< IndexEntry > > curr_chunk_connection_indexes_
void doWrite(std::string const &topic, ros::Time const &time, T const &msg, boost::shared_ptr< ros::M_string > const &connection_header)
uint64_t curr_chunk_data_pos_
Buffer outgoing_chunk_buffer_
reusable buffer to read chunk into
boost::shared_ptr< std::map< std::string, std::string > > connection_header
static const std::string TOPIC_FIELD_NAME
const char * definition()
void deserialize(Stream &stream, T &t)
static const std::string CALLERID_FIELD_NAME
void write(std::string const &topic, ros::MessageEvent< T > const &event)
Write a message into the bag file.