61 #include <boost/config.hpp>
62 #include <boost/format.hpp>
63 #include <boost/iterator/iterator_facade.hpp>
67 #include "console_bridge/console.h"
95 class MessageInstance;
101 friend class MessageInstance;
114 explicit Bag(std::string
const& filename, uint32_t mode =
bagmode::Read);
118 #ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
121 Bag& operator=(
Bag&& other);
122 #endif // BOOST_NO_CXX11_RVALUE_REFERENCES
136 std::string getFileName()
const;
138 uint32_t getMajorVersion()
const;
139 uint32_t getMinorVersion()
const;
140 uint64_t getSize()
const;
144 void setChunkThreshold(uint32_t chunk_threshold);
145 uint32_t getChunkThreshold()
const;
155 void setEncryptorPlugin(
const std::string& plugin_name,
const std::string& plugin_param = std::string());
177 void write(std::string
const& topic,
ros::Time const& time, T
const& msg,
213 Bag& operator=(
const Bag&);
221 void openRead (std::string
const& filename);
222 void openWrite (std::string
const& filename);
223 void openAppend(std::string
const& filename);
233 void startReadingVersion102();
234 void startReadingVersion200();
239 void writeFileHeaderRecord();
240 void writeConnectionRecord(
ConnectionInfo const* connection_info,
const bool encrypt);
243 void writeMessageDataRecord(uint32_t conn_id,
ros::Time const& time, T
const& msg);
244 void writeIndexRecords();
245 void writeConnectionRecords();
246 void writeChunkInfoRecords();
248 void writeChunkHeader(
CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size);
249 void stopWritingChunk();
254 void readFileHeaderRecord();
255 void readConnectionRecord();
256 void readChunkHeader(
ChunkHeader& chunk_header)
const;
257 void readChunkInfoRecord();
258 void readConnectionIndexRecord200();
260 void readTopicIndexRecord102();
261 void readMessageDefinitionRecord102();
262 void readMessageDataRecord102(uint64_t offset,
ros::Header& header)
const;
265 uint32_t readMessageDataSize(
IndexEntry const& index_entry)
const;
267 template<
typename Stream>
268 void readMessageDataIntoStream(
IndexEntry const& index_entry,
Stream& stream)
const;
270 void decompressChunk(uint64_t chunk_pos)
const;
271 void decompressRawChunk(
ChunkHeader const& chunk_header)
const;
272 void decompressBz2Chunk(
ChunkHeader const& chunk_header)
const;
273 void decompressLz4Chunk(
ChunkHeader const& chunk_header)
const;
274 uint32_t getChunkOffset()
const;
279 void writeDataLength(uint32_t data_len);
281 void appendDataLengthToBuffer(
Buffer& buf, uint32_t data_len);
283 void readHeaderFromBuffer(
Buffer& buffer, uint32_t offset,
ros::Header& header, uint32_t& data_size, uint32_t& bytes_read)
const;
284 void readMessageDataHeaderFromBuffer(
Buffer& buffer, uint32_t offset,
ros::Header& header, uint32_t& data_size, uint32_t& bytes_read)
const;
286 bool readDataLength(uint32_t& data_size)
const;
292 std::string toHeaderString(T
const* field)
const;
294 std::string toHeaderString(
ros::Time const* field)
const;
297 bool readField(
ros::M_string const& fields, std::string
const& field_name,
bool required, T* data)
const;
299 bool readField(
ros::M_string const& fields, std::string
const& field_name,
unsigned int min_len,
unsigned int max_len,
bool required, std::string& data)
const;
300 bool readField(
ros::M_string const& fields, std::string
const& field_name,
bool required, std::string& data)
const;
302 bool readField(
ros::M_string const& fields, std::string
const& field_name,
bool required,
ros::Time& data)
const;
304 ros::M_string::const_iterator checkField(
ros::M_string const& fields, std::string
const& field,
305 unsigned int min_len,
unsigned int max_len,
bool required)
const;
309 void write(
char const* s, std::streamsize n);
310 void write(std::string
const& s);
311 void read(
char* b, std::streamsize n)
const;
312 void seek(uint64_t pos,
int origin = std::ios_base::beg)
const;
319 uint32_t chunk_threshold_;
320 uint32_t bag_revision_;
323 uint64_t file_header_pos_;
324 uint64_t index_data_pos_;
325 uint32_t connection_count_;
326 uint32_t chunk_count_;
331 uint64_t curr_chunk_data_pos_;
333 std::map<std::string, uint32_t> topic_connection_ids_;
334 std::map<ros::M_string, uint32_t> header_connection_ids_;
335 std::map<uint32_t, ConnectionInfo*> connections_;
337 std::vector<ChunkInfo> chunks_;
339 std::map<uint32_t, std::multiset<IndexEntry> > connection_indexes_;
340 std::map<uint32_t, std::multiset<IndexEntry> > curr_chunk_connection_indexes_;
342 mutable Buffer header_buffer_;
343 mutable Buffer record_buffer_;
345 mutable Buffer chunk_buffer_;
346 mutable Buffer decompress_buffer_;
352 mutable uint64_t decompressed_chunk_;
380 doWrite(topic, time, *msg, connection_header);
385 doWrite(topic, time, *msg, connection_header);
390 return std::string((
char*) field,
sizeof(T));
395 ros::M_string::const_iterator i =
checkField(fields, field_name,
sizeof(T),
sizeof(T), required);
396 if (i == fields.end())
398 memcpy(data, i->second.data(),
sizeof(T));
402 template<
typename Stream>
445 uint32_t connection_id;
448 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter =
connections_.find(connection_id);
450 throw BagFormatException((boost::format(
"Unknown connection ID: %1%") % connection_id).str());
475 std::string topic, latching(
"0"), callerid;
480 std::map<std::string, uint32_t>::const_iterator topic_conn_id_iter =
topic_connection_ids_.find(topic);
483 uint32_t connection_id = topic_conn_id_iter->second;
485 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter =
connections_.find(connection_id);
487 throw BagFormatException((boost::format(
"Unknown connection ID: %1%") % connection_id).str());
494 for (ros::M_string::const_iterator i = connection_info->
header->begin(); i != connection_info->
header->end(); i++)
495 (*message_header)[i->first] = i->second;
496 (*message_header)[
"latching"] = latching;
497 (*message_header)[
"callerid"] = callerid;
520 throw BagException(
"Tried to insert a message with time less than ros::TIME_MIN");
528 uint32_t conn_id = 0;
529 if (!connection_header) {
532 std::map<std::string, uint32_t>::iterator topic_connection_ids_iter =
topic_connection_ids_.find(topic);
538 conn_id = topic_connection_ids_iter->second;
550 connection_header_copy[
"topic"] = topic;
552 std::map<ros::M_string, uint32_t>::iterator header_connection_ids_iter =
header_connection_ids_.find(connection_header_copy);
558 conn_id = header_connection_ids_iter->second;
565 seek(0, std::ios::end);
573 if (connection_info == NULL) {
575 connection_info->
id = conn_id;
576 connection_info->
topic = topic;
580 if (connection_header != NULL) {
581 connection_info->
header = connection_header;
584 connection_info->
header = boost::make_shared<ros::M_string>();
586 (*connection_info->
header)[
"md5sum"] = connection_info->
md5sum;
587 (*connection_info->
header)[
"message_definition"] = connection_info->
msg_def;
597 index_entry.
time = time;
602 chunk_connection_index.insert(chunk_connection_index.end(), index_entry);
606 connection_index.insert(connection_index.end(), index_entry);
617 CONSOLE_BRIDGE_logDebug(
" curr_chunk_size=%d (threshold=%d)", chunk_size,
chunk_threshold_);
649 seek(0, std::ios::end);
652 CONSOLE_BRIDGE_logDebug(
"Writing MSG_DATA [%llu:%d]: conn=%d sec=%d nsec=%d data_len=%d",