61 #include <boost/config.hpp> 62 #include <boost/format.hpp> 63 #include <boost/iterator/iterator_facade.hpp> 67 #include "console_bridge/console.h" 118 #ifndef BOOST_NO_CXX11_RVALUE_REFERENCES 121 Bag& operator=(
Bag&& other);
122 #endif // BOOST_NO_CXX11_RVALUE_REFERENCES 131 void open(std::string
const& filename, uint32_t mode =
bagmode::Read);
136 std::string getFileName()
const;
137 BagMode getMode()
const;
138 uint32_t getMajorVersion()
const;
139 uint32_t getMinorVersion()
const;
140 uint64_t getSize()
const;
144 void setChunkThreshold(uint32_t chunk_threshold);
145 uint32_t getChunkThreshold()
const;
155 void setEncryptorPlugin(
const std::string& plugin_name,
const std::string& plugin_param = std::string());
177 void write(std::string
const& topic,
ros::Time const& time, T
const& msg,
213 Bag& operator=(
const Bag&);
221 void openRead (std::string
const& filename);
222 void openWrite (std::string
const& filename);
223 void openAppend(std::string
const& filename);
233 void startReadingVersion102();
234 void startReadingVersion200();
239 void writeFileHeaderRecord();
240 void writeConnectionRecord(
ConnectionInfo const* connection_info,
const bool encrypt);
243 void writeMessageDataRecord(uint32_t conn_id,
ros::Time const& time, T
const& msg);
244 void writeIndexRecords();
245 void writeConnectionRecords();
246 void writeChunkInfoRecords();
248 void writeChunkHeader(
CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size);
249 void stopWritingChunk();
254 void readFileHeaderRecord();
255 void readConnectionRecord();
256 void readChunkHeader(
ChunkHeader& chunk_header)
const;
257 void readChunkInfoRecord();
258 void readConnectionIndexRecord200();
260 void readTopicIndexRecord102();
261 void readMessageDefinitionRecord102();
262 void readMessageDataRecord102(uint64_t offset,
ros::Header& header)
const;
265 uint32_t readMessageDataSize(
IndexEntry const& index_entry)
const;
267 template<
typename Stream>
268 void readMessageDataIntoStream(
IndexEntry const& index_entry,
Stream& stream)
const;
270 void decompressChunk(uint64_t chunk_pos)
const;
271 void decompressRawChunk(
ChunkHeader const& chunk_header)
const;
272 void decompressBz2Chunk(
ChunkHeader const& chunk_header)
const;
273 void decompressLz4Chunk(
ChunkHeader const& chunk_header)
const;
274 uint32_t getChunkOffset()
const;
279 void writeDataLength(uint32_t data_len);
281 void appendDataLengthToBuffer(
Buffer& buf, uint32_t data_len);
283 void readHeaderFromBuffer(
Buffer& buffer, uint32_t offset,
ros::Header& header, uint32_t& data_size, uint32_t& bytes_read)
const;
284 void readMessageDataHeaderFromBuffer(
Buffer& buffer, uint32_t offset,
ros::Header& header, uint32_t& data_size, uint32_t& bytes_read)
const;
286 bool readDataLength(uint32_t& data_size)
const;
292 std::string toHeaderString(T
const* field)
const;
294 std::string toHeaderString(
ros::Time const* field)
const;
297 bool readField(
ros::M_string const& fields, std::string
const& field_name,
bool required, T* data)
const;
299 bool readField(
ros::M_string const& fields, std::string
const& field_name,
unsigned int min_len,
unsigned int max_len,
bool required, std::string& data)
const;
300 bool readField(
ros::M_string const& fields, std::string
const& field_name,
bool required, std::string& data)
const;
302 bool readField(
ros::M_string const& fields, std::string
const& field_name,
bool required,
ros::Time& data)
const;
304 ros::M_string::const_iterator checkField(
ros::M_string const& fields, std::string
const& field,
305 unsigned int min_len,
unsigned int max_len,
bool required)
const;
309 void write(
char const* s, std::streamsize n);
310 void write(std::string
const& s);
311 void read(
char* b, std::streamsize n)
const;
312 void seek(uint64_t pos,
int origin = std::ios_base::beg)
const;
370 doWrite(topic, event.
getReceiptTime(), *
event.getMessage(),
event.getConnectionHeaderPtr());
375 doWrite(topic, time, msg, connection_header);
380 doWrite(topic, time, *msg, connection_header);
385 doWrite(topic, time, *msg, connection_header);
390 return std::string((
char*) field,
sizeof(T));
395 ros::M_string::const_iterator i = checkField(fields, field_name,
sizeof(T),
sizeof(T), required);
396 if (i == fields.end())
398 memcpy(data, i->second.data(),
sizeof(T));
402 template<
typename Stream>
412 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.
offset, header, data_size, bytes_read);
414 memcpy(stream.advance(data_size), current_buffer_->getData() + index_entry.
offset + bytes_read, data_size);
419 readMessageDataRecord102(index_entry.
chunk_pos, header);
420 data_size = record_buffer_.getSize();
422 memcpy(stream.advance(data_size), record_buffer_.getData(), data_size);
442 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.
offset, header, data_size, bytes_read);
445 uint32_t connection_id;
448 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
449 if (connection_iter == connections_.end())
450 throw BagFormatException((boost::format(
"Unknown connection ID: %1%") % connection_id).str());
457 predes_params.connection_header = connection_info->
header;
470 readMessageDataRecord102(index_entry.
chunk_pos, header);
475 std::string topic, latching(
"0"), callerid;
480 std::map<std::string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
481 if (topic_conn_id_iter == topic_connection_ids_.end())
483 uint32_t connection_id = topic_conn_id_iter->second;
485 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
486 if (connection_iter == connections_.end())
487 throw BagFormatException((boost::format(
"Unknown connection ID: %1%") % connection_id).str());
494 for (ros::M_string::const_iterator i = connection_info->
header->begin(); i != connection_info->
header->end(); i++)
495 (*message_header)[i->first] = i->second;
496 (*message_header)[
"latching"] = latching;
497 (*message_header)[
"callerid"] = callerid;
520 throw BagException(
"Tried to insert a message with time less than ros::TIME_MIN");
528 uint32_t conn_id = 0;
529 if (!connection_header) {
532 std::map<std::string, uint32_t>::iterator topic_connection_ids_iter = topic_connection_ids_.find(topic);
533 if (topic_connection_ids_iter == topic_connection_ids_.end()) {
534 conn_id = connections_.size();
535 topic_connection_ids_[topic] = conn_id;
538 conn_id = topic_connection_ids_iter->second;
539 connection_info = connections_[conn_id];
550 connection_header_copy[
"topic"] = topic;
552 std::map<ros::M_string, uint32_t>::iterator header_connection_ids_iter = header_connection_ids_.find(connection_header_copy);
553 if (header_connection_ids_iter == header_connection_ids_.end()) {
554 conn_id = connections_.size();
555 header_connection_ids_[connection_header_copy] = conn_id;
558 conn_id = header_connection_ids_iter->second;
559 connection_info = connections_[conn_id];
565 seek(0, std::ios::end);
566 file_size_ = file_.getOffset();
570 startWritingChunk(time);
573 if (connection_info == NULL) {
575 connection_info->
id = conn_id;
576 connection_info->
topic = topic;
580 if (connection_header != NULL) {
581 connection_info->
header = connection_header;
584 connection_info->
header = boost::make_shared<ros::M_string>();
586 (*connection_info->
header)[
"md5sum"] = connection_info->
md5sum;
587 (*connection_info->
header)[
"message_definition"] = connection_info->
msg_def;
589 connections_[conn_id] = connection_info;
591 writeConnectionRecord(connection_info,
false);
592 appendConnectionRecordToBuffer(outgoing_chunk_buffer_, connection_info);
597 index_entry.
time = time;
598 index_entry.
chunk_pos = curr_chunk_info_.pos;
599 index_entry.
offset = getChunkOffset();
601 std::multiset<IndexEntry>& chunk_connection_index = curr_chunk_connection_indexes_[connection_info->
id];
602 chunk_connection_index.insert(chunk_connection_index.end(), index_entry);
605 std::multiset<IndexEntry>& connection_index = connection_indexes_[connection_info->
id];
606 connection_index.insert(connection_index.end(), index_entry);
610 curr_chunk_info_.connection_counts[connection_info->
id]++;
613 writeMessageDataRecord(conn_id, time, msg);
616 uint32_t chunk_size = getChunkOffset();
617 CONSOLE_BRIDGE_logDebug(
" curr_chunk_size=%d (threshold=%d)", chunk_size, chunk_threshold_);
618 if (chunk_size > chunk_threshold_) {
621 outgoing_chunk_buffer_.setSize(0);
624 curr_chunk_info_.pos = -1;
639 record_buffer_.setSize(msg_ser_len);
649 seek(0, std::ios::end);
650 file_size_ = file_.getOffset();
652 CONSOLE_BRIDGE_logDebug(
"Writing MSG_DATA [%llu:%d]: conn=%d sec=%d nsec=%d data_len=%d",
653 (
unsigned long long) file_.getOffset(), getChunkOffset(), conn_id, time.
sec, time.
nsec, msg_ser_len);
656 writeDataLength(msg_ser_len);
657 write((
char*) record_buffer_.getData(), msg_ser_len);
660 appendHeaderToBuffer(outgoing_chunk_buffer_, header);
661 appendDataLengthToBuffer(outgoing_chunk_buffer_, msg_ser_len);
663 uint32_t offset = outgoing_chunk_buffer_.getSize();
664 outgoing_chunk_buffer_.setSize(outgoing_chunk_buffer_.getSize() + msg_ser_len);
665 memcpy(outgoing_chunk_buffer_.getData() + offset, record_buffer_.getData(), msg_ser_len);
668 if (time > curr_chunk_info_.end_time)
669 curr_chunk_info_.end_time = time;
670 else if (time < curr_chunk_info_.start_time)
671 curr_chunk_info_.start_time = time;
static void notify(const PreDeserializeParams< M > &)
#define ROSBAG_STORAGE_DECL
std::map< std::string, uint32_t > topic_connection_ids_
static const std::string LATCHING_FIELD_NAME
ChunkedFile reads and writes files which contain interleaved chunks of compressed and uncompressed da...
std::map< uint32_t, ConnectionInfo * > connections_
bool readField(ros::M_string const &fields, std::string const &field_name, bool required, T *data) const
std::string toHeaderString(T const *field) const
void init(const M_string &remappings)
static const std::string CONNECTION_FIELD_NAME
const Time TIME_MIN(0, 1)
std::map< ros::M_string, uint32_t > header_connection_ids_
ros::Time time
timestamp of the message
int writeHeader(roslz4_stream *str)
Buffer header_buffer_
reusable buffer in which to assemble the record header before writing to file
boost::shared_ptr< T > instantiateBuffer(IndexEntry const &index_entry) const
deserializes the message held in record_buffer_
Base class for rosbag exceptions.
void swap(Bag &a, Bag &b)
BagMode
The possible modes to open a bag in.
static const unsigned char OP_MSG_DATA
Buffer decompress_buffer_
reusable buffer to decompress chunks into
std::map< std::string, std::string > M_string
uint64_t decompressed_chunk_
position of decompressed chunk
A class pointing into a bag file.
void serialize(Stream &stream, const T &t)
void writeMessageDataRecord(uint32_t conn_id, ros::Time const &time, T const &msg)
boost::shared_ptr< rosbag::EncryptorBase > encryptor_
boost::shared_ptr< M > message
static const std::string OP_FIELD_NAME
uint64_t file_header_pos_
Buffer record_buffer_
reusable buffer in which to assemble the record data before writing to file
Buffer chunk_buffer_
reusable buffer to read chunk into
ChunkInfo curr_chunk_info_
std::map< uint32_t, std::multiset< IndexEntry > > connection_indexes_
uint64_t chunk_pos
absolute byte offset of the chunk record containing the message
uint32_t connection_count_
uint32_t offset
relative byte offset of the message record (either definition or data) in the chunk ...
uint32_t chunk_threshold_
std::vector< ChunkInfo > chunks_
ros::Time getReceiptTime() const
boost::shared_ptr< ros::M_string > header
uint32_t serializationLength(const T &t)
static const std::string TIME_FIELD_NAME
CompressionType compression_
std::map< uint32_t, std::multiset< IndexEntry > > curr_chunk_connection_indexes_
void doWrite(std::string const &topic, ros::Time const &time, T const &msg, boost::shared_ptr< ros::M_string > const &connection_header)
uint64_t curr_chunk_data_pos_
Buffer outgoing_chunk_buffer_
reusable buffer to read chunk into
pluginlib::ClassLoader< rosbag::EncryptorBase > encryptor_loader_
boost::shared_ptr< std::map< std::string, std::string > > connection_header
static const std::string TOPIC_FIELD_NAME
const char * definition()
void deserialize(Stream &stream, T &t)
static const std::string CALLERID_FIELD_NAME
void write(std::string const &topic, ros::MessageEvent< T > const &event)
Write a message into the bag file.
void readMessageDataIntoStream(IndexEntry const &index_entry, Stream &stream) const