64 #include "../../../console_bridge/include/console_bridge/console.h" 115 std::string getFileName()
const;
116 BagMode getMode()
const;
123 std::tuple<std::string, uint64_t, uint64_t> getCompressionInfo()
const;
124 void setChunkThreshold(
uint32_t chunk_threshold);
148 std::shared_ptr<rs2rosinternal::M_string> connection_header = std::shared_ptr<rs2rosinternal::M_string>());
160 void write(std::string
const& topic,
rs2rosinternal::Time const& time, std::shared_ptr<T const>
const& msg,
161 std::shared_ptr<rs2rosinternal::M_string> connection_header = std::shared_ptr<rs2rosinternal::M_string>());
173 void write(std::string
const& topic,
rs2rosinternal::Time const& time, std::shared_ptr<T>
const& msg,
174 std::shared_ptr<rs2rosinternal::M_string> connection_header = std::shared_ptr<rs2rosinternal::M_string>());
179 void doWrite(std::string
const& topic,
rs2rosinternal::Time const& time,
T const& msg, std::shared_ptr<rs2rosinternal::M_string>
const& connection_header);
181 void openRead (std::string
const&
filename);
182 void openWrite (std::string
const&
filename);
183 void openAppend(std::string
const&
filename);
188 std::shared_ptr<T> instantiateBuffer(
IndexEntry const& index_entry)
const;
193 void startReadingVersion102();
194 void startReadingVersion200();
199 void writeFileHeaderRecord();
200 void writeConnectionRecord(
ConnectionInfo const* connection_info);
204 void writeIndexRecords();
205 void writeConnectionRecords();
206 void writeChunkInfoRecords();
209 void stopWritingChunk();
214 void readFileHeaderRecord();
215 void readConnectionRecord();
216 void readChunkHeader(
ChunkHeader& chunk_header)
const;
217 void readChunkInfoRecord();
218 void readConnectionIndexRecord200();
220 void readTopicIndexRecord102();
221 void readMessageDefinitionRecord102();
227 template<
typename Stream>
228 void readMessageDataIntoStream(
IndexEntry const& index_entry,
Stream& stream)
const;
230 void decompressChunk(
uint64_t chunk_pos)
const;
231 void decompressRawChunk(
ChunkHeader const& chunk_header)
const;
232 void decompressBz2Chunk(
ChunkHeader const& chunk_header)
const;
233 void decompressLz4Chunk(
ChunkHeader const& chunk_header)
const;
239 void writeDataLength(
uint32_t data_len);
246 bool readDataLength(
uint32_t& data_size)
const;
252 std::string toHeaderString(
T const* field)
const;
259 bool readField(
rs2rosinternal::M_string const& fields, std::string
const& field_name,
unsigned int min_len,
unsigned int max_len,
bool required, std::string&
data)
const;
264 rs2rosinternal::M_string::const_iterator checkField(
rs2rosinternal::M_string const& fields, std::string
const& field,
265 unsigned int min_len,
unsigned int max_len,
bool required)
const;
269 void write(
char const*
s, std::streamsize
n);
270 void write(std::string
const&
s);
271 void read(
char*
b, std::streamsize
n)
const;
272 void seek(
uint64_t pos,
int origin = std::ios_base::beg)
const;
325 doWrite(topic, event.
getReceiptTime(), *
event.getMessage(),
event.getConnectionHeaderPtr());
330 doWrite(topic, time, msg, connection_header);
334 void Bag::write(std::string
const& topic,
rs2rosinternal::Time const& time, std::shared_ptr<T const>
const& msg, std::shared_ptr<rs2rosinternal::M_string> connection_header) {
335 doWrite(topic, time, *msg, connection_header);
339 void Bag::write(std::string
const& topic,
rs2rosinternal::Time const& time, std::shared_ptr<T>
const& msg, std::shared_ptr<rs2rosinternal::M_string> connection_header) {
340 doWrite(topic, time, *msg, connection_header);
345 return std::string((
char*) field,
sizeof(
T));
350 rs2rosinternal::M_string::const_iterator
i = checkField(fields, field_name,
sizeof(
T),
sizeof(
T), required);
351 if (i == fields.end())
353 memcpy(data, i->second.data(),
sizeof(
T));
357 template<
typename Stream>
367 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.
offset, header, data_size, bytes_read);
369 memcpy(stream.advance(data_size), current_buffer_->getData() + index_entry.
offset + bytes_read, data_size);
374 readMessageDataRecord102(index_entry.
chunk_pos, header);
375 data_size = record_buffer_.getSize();
377 memcpy(stream.advance(data_size), record_buffer_.getData(), data_size);
397 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.
offset, header, data_size, bytes_read);
403 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
404 if (connection_iter == connections_.end())
408 std::shared_ptr<T>
p = std::make_shared<T>();
412 predes_params.connection_header = connection_info->
header;
425 readMessageDataRecord102(index_entry.
chunk_pos, header);
430 std::string topic, latching(
"0"), callerid;
435 std::map<std::string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
436 if (topic_conn_id_iter == topic_connection_ids_.end())
438 uint32_t connection_id = topic_conn_id_iter->second;
440 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
441 if (connection_iter == connections_.end())
445 std::shared_ptr<T>
p = std::make_shared<T>();
448 std::shared_ptr<rs2rosinternal::M_string> message_header(std::make_shared<rs2rosinternal::M_string>());
449 for (rs2rosinternal::M_string::const_iterator
i = connection_info->
header->begin();
i != connection_info->
header->end();
i++)
450 (*message_header)[
i->first] =
i->second;
451 (*message_header)[
"latching"] = latching;
452 (*message_header)[
"callerid"] = callerid;
475 throw BagException(
"Tried to insert a message with time less than rs2rosinternal::TIME_MIN");
484 if (!connection_header) {
487 std::map<std::string, uint32_t>::iterator topic_connection_ids_iter = topic_connection_ids_.find(topic);
488 if (topic_connection_ids_iter == topic_connection_ids_.end()) {
489 conn_id =
static_cast<uint32_t>(connections_.size());
490 topic_connection_ids_[topic] = conn_id;
493 conn_id = topic_connection_ids_iter->second;
494 connection_info = connections_[conn_id];
505 connection_header_copy[
"topic"] = topic;
507 std::map<rs2rosinternal::M_string, uint32_t>::iterator header_connection_ids_iter = header_connection_ids_.find(connection_header_copy);
508 if (header_connection_ids_iter == header_connection_ids_.end()) {
509 conn_id =
static_cast<uint32_t>(connections_.size());
510 header_connection_ids_[connection_header_copy] = conn_id;
513 conn_id = header_connection_ids_iter->second;
514 connection_info = connections_[conn_id];
521 file_size_ = file_.getOffset();
525 startWritingChunk(time);
528 if (connection_info ==
NULL) {
530 connection_info->
id = conn_id;
531 connection_info->
topic = topic;
535 if (connection_header !=
NULL) {
536 connection_info->
header = connection_header;
539 connection_info->
header = std::make_shared<rs2rosinternal::M_string>();
541 (*connection_info->
header)[
"md5sum"] = connection_info->
md5sum;
542 (*connection_info->
header)[
"message_definition"] = connection_info->
msg_def;
544 connections_[conn_id] = connection_info;
546 writeConnectionRecord(connection_info);
547 appendConnectionRecordToBuffer(outgoing_chunk_buffer_, connection_info);
552 index_entry.
time = time;
553 index_entry.
chunk_pos = curr_chunk_info_.pos;
554 index_entry.
offset = getChunkOffset();
556 std::multiset<IndexEntry>& chunk_connection_index = curr_chunk_connection_indexes_[connection_info->
id];
557 chunk_connection_index.insert(chunk_connection_index.end(), index_entry);
558 std::multiset<IndexEntry>& connection_index = connection_indexes_[connection_info->
id];
559 connection_index.insert(connection_index.end(), index_entry);
562 curr_chunk_info_.connection_counts[connection_info->
id]++;
565 writeMessageDataRecord(conn_id, time, msg);
570 if (chunk_size > chunk_threshold_) {
573 outgoing_chunk_buffer_.setSize(0);
576 curr_chunk_info_.pos = -1;
591 record_buffer_.setSize(msg_ser_len);
602 file_size_ = file_.getOffset();
605 (
unsigned long long) file_.getOffset(), getChunkOffset(), conn_id, time.
sec, time.
nsec, msg_ser_len);
608 writeDataLength(msg_ser_len);
609 write((
char*) record_buffer_.getData(), msg_ser_len);
612 appendHeaderToBuffer(outgoing_chunk_buffer_, header);
613 appendDataLengthToBuffer(outgoing_chunk_buffer_, msg_ser_len);
616 outgoing_chunk_buffer_.setSize(outgoing_chunk_buffer_.getSize() + msg_ser_len);
617 memcpy(outgoing_chunk_buffer_.getData() +
offset, record_buffer_.getData(), msg_ser_len);
620 if (time > curr_chunk_info_.end_time)
621 curr_chunk_info_.end_time = time;
622 else if (time < curr_chunk_info_.start_time)
623 curr_chunk_info_.start_time = time;
const char * md5sum()
returns MD5Sum<M>::value();
void serialize(Stream &stream, const T &t)
Serialize an object. Stream here should normally be a rs2rosinternal::serialization::OStream.
std::map< std::string, uint32_t > topic_connection_ids_
static const std::string LATCHING_FIELD_NAME
std::shared_ptr< std::map< std::string, std::string > > connection_header
void writeMessageDataRecord(uint32_t conn_id, rs2rosinternal::Time const &time, T const &msg)
ChunkedFile reads and writes files which contain interleaved chunks of compressed and uncompressed da...
std::map< uint32_t, ConnectionInfo * > connections_
std::string toHeaderString(T const *field) const
static const std::string CONNECTION_FIELD_NAME
const char * definition()
returns Definition<M>::value();
int writeHeader(roslz4_stream *str)
Buffer header_buffer_
reusable buffer in which to assemble the record header before writing to file
std::map< rs2rosinternal::M_string, uint32_t > header_connection_ids_
Base class for rosbag exceptions.
std_msgs::Header * header(M &m)
returns Header<M>::pointer(m);
Event type for subscriptions, const rs2rosinternal::MessageEvent<M const>& can be used in your callba...
static void notify(const PreDeserializeParams< M > &)
BagMode
The possible modes to open a bag in.
static const unsigned char OP_MSG_DATA
const char * datatype()
returns DataType<M>::value();
GLenum GLuint GLenum GLsizei const GLchar * buf
Buffer decompress_buffer_
reusable buffer to decompress chunks into
void deserialize(Stream &stream, T &t)
Deserialize an object. Stream here should normally be a rs2rosinternal::serialization::IStream.
void doWrite(std::string const &topic, rs2rosinternal::Time const &time, T const &msg, std::shared_ptr< rs2rosinternal::M_string > const &connection_header)
uint64_t decompressed_chunk_
position of decompressed chunk
std::shared_ptr< rs2rosinternal::M_string > header
A class pointing into a bag file.
void write(std::string const &topic, rs2rosinternal::MessageEvent< T > const &event)
Write a message into the bag file.
std::map< std::string, std::string > M_string
Time representation. May either represent wall clock time or ROS clock time.
unsigned __int64 uint64_t
static const std::string OP_FIELD_NAME
uint64_t file_header_pos_
Buffer record_buffer_
reusable buffer in which to assemble the record data before writing to file
uint32_t serializationLength(const T &t)
Determine the serialized length of an object.
Buffer chunk_buffer_
reusable buffer to read chunk into
ChunkInfo curr_chunk_info_
std::map< uint32_t, std::multiset< IndexEntry > > connection_indexes_
#define CONSOLE_BRIDGE_logDebug(fmt,...)
rs2rosinternal::Time getReceiptTime() const
Returns the time at which this message was received.
uint64_t chunk_pos
timestamp of the message
uint32_t connection_count_
GLboolean GLboolean GLboolean b
uint32_t offset
absolute byte offset of the chunk record containing the message
uint32_t chunk_threshold_
std::vector< ChunkInfo > chunks_
std::shared_ptr< T > instantiateBuffer(IndexEntry const &index_entry) const
deserializes the message held in record_buffer_
ROSTIME_DECL const Time TIME_MIN
static const std::string TIME_FIELD_NAME
CompressionType compression_
std::map< uint32_t, std::multiset< IndexEntry > > curr_chunk_connection_indexes_
std::shared_ptr< M > message
uint64_t curr_chunk_data_pos_
Buffer outgoing_chunk_buffer_
reusable buffer to read chunk into
rs2rosinternal::Time time
static const std::string TOPIC_FIELD_NAME
static const std::string CALLERID_FIELD_NAME
bool readField(rs2rosinternal::M_string const &fields, std::string const &field_name, bool required, T *data) const
#define ROSBAG_DECL
#include <ros/macros.h> // for the DECL's
void readMessageDataIntoStream(IndexEntry const &index_entry, Stream &stream) const