60 #include <boost/config.hpp> 61 #include <boost/format.hpp> 62 #include <boost/iterator/iterator_facade.hpp> 64 #include "console_bridge/console.h" 115 #ifndef BOOST_NO_CXX11_RVALUE_REFERENCES 118 Bag& operator=(
Bag&& other);
119 #endif // BOOST_NO_CXX11_RVALUE_REFERENCES 128 void open(std::string
const& filename, uint32_t mode =
bagmode::Read);
133 std::string getFileName()
const;
134 BagMode getMode()
const;
135 uint32_t getMajorVersion()
const;
136 uint32_t getMinorVersion()
const;
137 uint64_t getSize()
const;
141 void setChunkThreshold(uint32_t chunk_threshold);
142 uint32_t getChunkThreshold()
const;
164 void write(std::string
const& topic,
ros::Time const& time, T
const& msg,
200 Bag& operator=(
const Bag&);
208 void openRead (std::string
const& filename);
209 void openWrite (std::string
const& filename);
210 void openAppend(std::string
const& filename);
220 void startReadingVersion102();
221 void startReadingVersion200();
226 void writeFileHeaderRecord();
227 void writeConnectionRecord(
ConnectionInfo const* connection_info);
230 void writeMessageDataRecord(uint32_t conn_id,
ros::Time const& time, T
const& msg);
231 void writeIndexRecords();
232 void writeConnectionRecords();
233 void writeChunkInfoRecords();
235 void writeChunkHeader(
CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size);
236 void stopWritingChunk();
241 void readFileHeaderRecord();
242 void readConnectionRecord();
243 void readChunkHeader(
ChunkHeader& chunk_header)
const;
244 void readChunkInfoRecord();
245 void readConnectionIndexRecord200();
247 void readTopicIndexRecord102();
248 void readMessageDefinitionRecord102();
249 void readMessageDataRecord102(uint64_t offset,
ros::Header& header)
const;
252 uint32_t readMessageDataSize(
IndexEntry const& index_entry)
const;
254 template<
typename Stream>
255 void readMessageDataIntoStream(
IndexEntry const& index_entry,
Stream& stream)
const;
257 void decompressChunk(uint64_t chunk_pos)
const;
258 void decompressRawChunk(
ChunkHeader const& chunk_header)
const;
259 void decompressBz2Chunk(
ChunkHeader const& chunk_header)
const;
260 void decompressLz4Chunk(
ChunkHeader const& chunk_header)
const;
261 uint32_t getChunkOffset()
const;
266 void writeDataLength(uint32_t data_len);
268 void appendDataLengthToBuffer(
Buffer& buf, uint32_t data_len);
270 void readHeaderFromBuffer(
Buffer& buffer, uint32_t offset,
ros::Header& header, uint32_t& data_size, uint32_t& bytes_read)
const;
271 void readMessageDataHeaderFromBuffer(
Buffer& buffer, uint32_t offset,
ros::Header& header, uint32_t& data_size, uint32_t& bytes_read)
const;
273 bool readDataLength(uint32_t& data_size)
const;
279 std::string toHeaderString(T
const* field)
const;
281 std::string toHeaderString(
ros::Time const* field)
const;
284 bool readField(
ros::M_string const& fields, std::string
const& field_name,
bool required, T* data)
const;
286 bool readField(
ros::M_string const& fields, std::string
const& field_name,
unsigned int min_len,
unsigned int max_len,
bool required, std::string& data)
const;
287 bool readField(
ros::M_string const& fields, std::string
const& field_name,
bool required, std::string& data)
const;
289 bool readField(
ros::M_string const& fields, std::string
const& field_name,
bool required,
ros::Time& data)
const;
291 ros::M_string::const_iterator checkField(
ros::M_string const& fields, std::string
const& field,
292 unsigned int min_len,
unsigned int max_len,
bool required)
const;
296 void write(
char const* s, std::streamsize n);
297 void write(std::string
const& s);
298 void read(
char* b, std::streamsize n)
const;
299 void seek(uint64_t pos,
int origin = std::ios_base::beg)
const;
352 doWrite(topic, event.
getReceiptTime(), *
event.getMessage(),
event.getConnectionHeaderPtr());
357 doWrite(topic, time, msg, connection_header);
362 doWrite(topic, time, *msg, connection_header);
367 doWrite(topic, time, *msg, connection_header);
372 return std::string((
char*) field,
sizeof(T));
377 ros::M_string::const_iterator i = checkField(fields, field_name,
sizeof(T),
sizeof(T), required);
378 if (i == fields.end())
380 memcpy(data, i->second.data(),
sizeof(T));
384 template<
typename Stream>
394 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.
offset, header, data_size, bytes_read);
396 memcpy(stream.advance(data_size), current_buffer_->getData() + index_entry.
offset + bytes_read, data_size);
401 readMessageDataRecord102(index_entry.
chunk_pos, header);
402 data_size = record_buffer_.getSize();
404 memcpy(stream.advance(data_size), record_buffer_.getData(), data_size);
424 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.
offset, header, data_size, bytes_read);
427 uint32_t connection_id;
430 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
431 if (connection_iter == connections_.end())
432 throw BagFormatException((boost::format(
"Unknown connection ID: %1%") % connection_id).str());
439 predes_params.connection_header = connection_info->
header;
452 readMessageDataRecord102(index_entry.
chunk_pos, header);
457 std::string topic, latching(
"0"), callerid;
462 std::map<std::string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
463 if (topic_conn_id_iter == topic_connection_ids_.end())
465 uint32_t connection_id = topic_conn_id_iter->second;
467 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
468 if (connection_iter == connections_.end())
469 throw BagFormatException((boost::format(
"Unknown connection ID: %1%") % connection_id).str());
476 for (ros::M_string::const_iterator i = connection_info->
header->begin(); i != connection_info->
header->end(); i++)
477 (*message_header)[i->first] = i->second;
478 (*message_header)[
"latching"] = latching;
479 (*message_header)[
"callerid"] = callerid;
502 throw BagException(
"Tried to insert a message with time less than ros::TIME_MIN");
510 uint32_t conn_id = 0;
511 if (!connection_header) {
514 std::map<std::string, uint32_t>::iterator topic_connection_ids_iter = topic_connection_ids_.find(topic);
515 if (topic_connection_ids_iter == topic_connection_ids_.end()) {
516 conn_id = connections_.size();
517 topic_connection_ids_[topic] = conn_id;
520 conn_id = topic_connection_ids_iter->second;
521 connection_info = connections_[conn_id];
532 connection_header_copy[
"topic"] = topic;
534 std::map<ros::M_string, uint32_t>::iterator header_connection_ids_iter = header_connection_ids_.find(connection_header_copy);
535 if (header_connection_ids_iter == header_connection_ids_.end()) {
536 conn_id = connections_.size();
537 header_connection_ids_[connection_header_copy] = conn_id;
540 conn_id = header_connection_ids_iter->second;
541 connection_info = connections_[conn_id];
547 seek(0, std::ios::end);
548 file_size_ = file_.getOffset();
552 startWritingChunk(time);
555 if (connection_info == NULL) {
557 connection_info->
id = conn_id;
558 connection_info->
topic = topic;
562 if (connection_header != NULL) {
563 connection_info->
header = connection_header;
566 connection_info->
header = boost::make_shared<ros::M_string>();
568 (*connection_info->
header)[
"md5sum"] = connection_info->
md5sum;
569 (*connection_info->
header)[
"message_definition"] = connection_info->
msg_def;
571 connections_[conn_id] = connection_info;
573 writeConnectionRecord(connection_info);
574 appendConnectionRecordToBuffer(outgoing_chunk_buffer_, connection_info);
579 index_entry.
time = time;
580 index_entry.
chunk_pos = curr_chunk_info_.pos;
581 index_entry.
offset = getChunkOffset();
583 std::multiset<IndexEntry>& chunk_connection_index = curr_chunk_connection_indexes_[connection_info->
id];
584 chunk_connection_index.insert(chunk_connection_index.end(), index_entry);
585 std::multiset<IndexEntry>& connection_index = connection_indexes_[connection_info->
id];
586 connection_index.insert(connection_index.end(), index_entry);
589 curr_chunk_info_.connection_counts[connection_info->
id]++;
592 writeMessageDataRecord(conn_id, time, msg);
595 uint32_t chunk_size = getChunkOffset();
596 CONSOLE_BRIDGE_logDebug(
" curr_chunk_size=%d (threshold=%d)", chunk_size, chunk_threshold_);
597 if (chunk_size > chunk_threshold_) {
600 outgoing_chunk_buffer_.setSize(0);
603 curr_chunk_info_.pos = -1;
618 record_buffer_.setSize(msg_ser_len);
628 seek(0, std::ios::end);
629 file_size_ = file_.getOffset();
631 CONSOLE_BRIDGE_logDebug(
"Writing MSG_DATA [%llu:%d]: conn=%d sec=%d nsec=%d data_len=%d",
632 (
unsigned long long) file_.getOffset(), getChunkOffset(), conn_id, time.
sec, time.
nsec, msg_ser_len);
635 writeDataLength(msg_ser_len);
636 write((
char*) record_buffer_.getData(), msg_ser_len);
639 appendHeaderToBuffer(outgoing_chunk_buffer_, header);
640 appendDataLengthToBuffer(outgoing_chunk_buffer_, msg_ser_len);
642 uint32_t offset = outgoing_chunk_buffer_.getSize();
643 outgoing_chunk_buffer_.setSize(outgoing_chunk_buffer_.getSize() + msg_ser_len);
644 memcpy(outgoing_chunk_buffer_.getData() + offset, record_buffer_.getData(), msg_ser_len);
647 if (time > curr_chunk_info_.end_time)
648 curr_chunk_info_.end_time = time;
649 else if (time < curr_chunk_info_.start_time)
650 curr_chunk_info_.start_time = time;
static void notify(const PreDeserializeParams< M > &)
std::map< std::string, uint32_t > topic_connection_ids_
static const std::string LATCHING_FIELD_NAME
ros::Time getReceiptTime() const
ChunkedFile reads and writes files which contain interleaved chunks of compressed and uncompressed da...
std::map< uint32_t, ConnectionInfo * > connections_
static const std::string CONNECTION_FIELD_NAME
std::map< ros::M_string, uint32_t > header_connection_ids_
int writeHeader(roslz4_stream *str)
Buffer header_buffer_
reusable buffer in which to assemble the record header before writing to file
Base class for rosbag exceptions.
std_msgs::Header * header(M &m)
void swap(Bag &a, Bag &b)
BagMode
The possible modes to open a bag in.
static const unsigned char OP_MSG_DATA
boost::shared_ptr< T > instantiateBuffer(IndexEntry const &index_entry) const
deserializes the message held in record_buffer_
bool readField(ros::M_string const &fields, std::string const &field_name, bool required, T *data) const
Buffer decompress_buffer_
reusable buffer to decompress chunks into
std::map< std::string, std::string > M_string
std::string toHeaderString(T const *field) const
uint64_t decompressed_chunk_
position of decompressed chunk
A class pointing into a bag file.
void serialize(Stream &stream, const T &t)
void readMessageDataIntoStream(IndexEntry const &index_entry, Stream &stream) const
void writeMessageDataRecord(uint32_t conn_id, ros::Time const &time, T const &msg)
boost::shared_ptr< M > message
static const std::string OP_FIELD_NAME
uint64_t file_header_pos_
Buffer record_buffer_
reusable buffer in which to assemble the record data before writing to file
Buffer chunk_buffer_
reusable buffer to read chunk into
ChunkInfo curr_chunk_info_
std::map< uint32_t, std::multiset< IndexEntry > > connection_indexes_
uint64_t chunk_pos
timestamp of the message
uint32_t connection_count_
uint32_t offset
absolute byte offset of the chunk record containing the message
uint32_t chunk_threshold_
std::vector< ChunkInfo > chunks_
ROSTIME_DECL const Time TIME_MIN
boost::shared_ptr< ros::M_string > header
uint32_t serializationLength(const T &t)
static const std::string TIME_FIELD_NAME
CompressionType compression_
std::map< uint32_t, std::multiset< IndexEntry > > curr_chunk_connection_indexes_
void doWrite(std::string const &topic, ros::Time const &time, T const &msg, boost::shared_ptr< ros::M_string > const &connection_header)
uint64_t curr_chunk_data_pos_
Buffer outgoing_chunk_buffer_
reusable buffer to read chunk into
boost::shared_ptr< std::map< std::string, std::string > > connection_header
static const std::string TOPIC_FIELD_NAME
const char * definition()
void deserialize(Stream &stream, T &t)
static const std::string CALLERID_FIELD_NAME
void write(std::string const &topic, ros::MessageEvent< T > const &event)
Write a message into the bag file.