bag.cpp
Go to the documentation of this file.
1 // Copyright (c) 2009, Willow Garage, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 //
7 // * Redistributions of source code must retain the above copyright
8 // notice, this list of conditions and the following disclaimer.
9 // * Redistributions in binary form must reproduce the above copyright
10 // notice, this list of conditions and the following disclaimer in the
11 // documentation and/or other materials provided with the distribution.
12 // * Neither the name of Willow Garage, Inc. nor the names of its
13 // contributors may be used to endorse or promote products derived from
14 // this software without specific prior written permission.
15 //
16 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 // POSSIBILITY OF SUCH DAMAGE.
27 
28 #include "rosbag/bag.h"
30 #include "rosbag/query.h"
31 #include "rosbag/view.h"
32 
33 #if defined(_MSC_VER)
34  #include <stdint.h> // only on v2010 and later -> is this enough for msvc and linux?
35 #else
36  #include <inttypes.h>
37 #endif
38 #include <signal.h>
39 #include <assert.h>
40 #include <iomanip>
41 
42 #include <boost/bind/bind.hpp>
43 
44 #include "console_bridge/console.h"
45 
46 using std::map;
47 using std::priority_queue;
48 using std::string;
49 using std::vector;
50 using std::multiset;
51 using boost::format;
52 using boost::shared_ptr;
53 using ros::M_string;
54 using ros::Time;
55 
56 namespace rosbag {
57 
58 Bag::Bag() : encryptor_loader_("rosbag_storage", "rosbag::EncryptorBase")
59 {
60  init();
61 }
62 
63 Bag::Bag(string const& filename, uint32_t mode) : encryptor_loader_("rosbag_storage", "rosbag::EncryptorBase")
64 {
65  init();
66  open(filename, mode);
67 }
68 
69 #ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
70 
71 Bag::Bag(Bag&& other) : encryptor_loader_("rosbag_storage", "rosbag::EncryptorBase") {
72  init();
73  swap(other);
74 }
75 
76 Bag& Bag::operator=(Bag&& other) {
77  swap(other);
78  return *this;
79 }
80 
81 #endif // BOOST_NO_CXX11_RVALUE_REFERENCES
82 
84  close();
85 }
86 
87 void Bag::init() {
89  version_ = 0;
91  chunk_threshold_ = 768 * 1024; // 768KB chunks
92  bag_revision_ = 0;
93  file_size_ = 0;
94  file_header_pos_ = 0;
95  index_data_pos_ = 0;
97  chunk_count_ = 0;
98  chunk_open_ = false;
100  current_buffer_ = 0;
102  setEncryptorPlugin(std::string("rosbag/NoEncryptor"));
103 }
104 
105 void Bag::open(string const& filename, uint32_t mode) {
106  mode_ = (BagMode) mode;
107 
108  if (mode_ & bagmode::Append)
109  openAppend(filename);
110  else if (mode_ & bagmode::Write)
111  openWrite(filename);
112  else if (mode_ & bagmode::Read)
113  openRead(filename);
114  else
115  throw BagException((format("Unknown mode: %1%") % (int) mode).str());
116 
117  // Determine file size
118  uint64_t offset = file_.getOffset();
119  seek(0, std::ios::end);
121  seek(offset);
122 }
123 
124 void Bag::openRead(string const& filename) {
125  file_.openRead(filename);
126 
127  readVersion();
128 
129  switch (version_) {
130  case 102: startReadingVersion102(); break;
131  case 200: startReadingVersion200(); break;
132  default:
133  throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str());
134  }
135 }
136 
137 void Bag::openWrite(string const& filename) {
138  file_.openWrite(filename);
139 
140  startWriting();
141 }
142 
143 void Bag::openAppend(string const& filename) {
144  file_.openReadWrite(filename);
145 
146  readVersion();
147 
148  if (version_ != 200)
149  throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str());
150 
152 
153  // Truncate the file to chop off the index
155  index_data_pos_ = 0;
156 
157  // Rewrite the file header, clearing the index position (so we know if the index is invalid)
160 
161  // Seek to the end of the file
162  seek(0, std::ios::end);
163 }
164 
165 void Bag::close() {
166  if (!isOpen())
167  return;
168 
170  closeWrite();
171 
172  file_.close();
173 
174  topic_connection_ids_.clear();
175  header_connection_ids_.clear();
176  for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
177  delete i->second;
178  connections_.clear();
179  chunks_.clear();
180  connection_indexes_.clear();
182 
183  init();
184 }
185 
187  stopWriting();
188 }
189 
190 string Bag::getFileName() const { return file_.getFileName(); }
191 BagMode Bag::getMode() const { return mode_; }
192 uint64_t Bag::getSize() const { return file_size_; }
193 
194 uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; }
195 
196 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
197  if (isOpen() && chunk_open_)
199 
200  chunk_threshold_ = chunk_threshold;
201 }
202 
204 
206  if (isOpen() && chunk_open_)
208 
209  if (!(compression == compression::Uncompressed ||
210  compression == compression::BZ2 ||
211  compression == compression::LZ4)) {
212  throw BagException(
213  (format("Unknown compression type: %i") % compression).str());
214  }
215 
216  compression_ = compression;
217 }
218 
219 void Bag::setEncryptorPlugin(std::string const& plugin_name, std::string const& plugin_param) {
220  if (!chunks_.empty()) {
221  throw BagException("Cannot set encryption plugin after chunks are written");
222  }
224  encryptor_->initialize(*this, plugin_param);
225 }
226 
227 // Version
228 
230  string version = string("#ROSBAG V") + VERSION + string("\n");
231 
232  CONSOLE_BRIDGE_logDebug("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
233 
234  version_ = 200;
235 
236  write(version);
237 }
238 
240  string version_line = file_.getline();
241 
243 
244  char logtypename[100];
245  int version_major, version_minor;
246 #if defined(_MSC_VER)
247  if (sscanf_s(version_line.c_str(), "#ROS%s V%d.%d", logtypename, sizeof(logtypename), &version_major, &version_minor) != 3)
248 #else
249  if (sscanf(version_line.c_str(), "#ROS%99s V%d.%d", logtypename, &version_major, &version_minor) != 3)
250 #endif
251  throw BagIOException("Error reading version line");
252 
253  version_ = version_major * 100 + version_minor;
254 
255  CONSOLE_BRIDGE_logDebug("Read VERSION: version=%d", version_);
256 }
257 
258 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
259 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
260 
261 //
262 
264  writeVersion();
267 }
268 
270  if (chunk_open_)
272 
273  seek(0, std::ios::end);
274 
278 
281 }
282 
284  // Read the file header record, which points to the end of the chunks
286 
287  // Seek to the end of the chunks
289 
290  // Read the connection records (one for each connection)
291  for (uint32_t i = 0; i < connection_count_; i++)
293 
294  // Read the chunk info records
295  for (uint32_t i = 0; i < chunk_count_; i++)
297 
298  // Read the connection indexes for each chunk
299  for (ChunkInfo const& chunk_info : chunks_) {
300  curr_chunk_info_ = chunk_info;
301 
303 
304  // Skip over the chunk data
305  ChunkHeader chunk_header;
306  readChunkHeader(chunk_header);
307  seek(chunk_header.compressed_size, std::ios::cur);
308 
309  // Read the index records after the chunk
310  for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
312  }
313 
314  // At this point we don't have a curr_chunk_info anymore so we reset it
316 }
317 
319  try
320  {
321  // Read the file header record, which points to the start of the topic indexes
323  }
324  catch (const BagFormatException& ex) {
325  throw BagUnindexedException();
326  }
327 
328  // Get the length of the file
329  seek(0, std::ios::end);
330  uint64_t filelength = file_.getOffset();
331 
332  // Seek to the beginning of the topic index records
334 
335  // Read the topic index records, which point to the offsets of each message in the file
336  while (file_.getOffset() < filelength)
338 
339  // Read the message definition records (which are the first entry in the topic indexes)
340  for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
341  multiset<IndexEntry> const& index = i->second;
342  IndexEntry const& first_entry = *index.begin();
343 
344  CONSOLE_BRIDGE_logDebug("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
345 
346  seek(first_entry.chunk_pos);
347 
349  }
350 }
351 
352 // File header record
353 
356  chunk_count_ = chunks_.size();
357 
358  CONSOLE_BRIDGE_logDebug("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
359  (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
360 
361  // Write file header record
367  encryptor_->addFieldsToFileHeader(header);
368 
369  boost::shared_array<uint8_t> header_buffer;
370  uint32_t header_len;
371  ros::Header::write(header, header_buffer, header_len);
372  uint32_t data_len = 0;
373  if (header_len < FILE_HEADER_LENGTH)
374  data_len = FILE_HEADER_LENGTH - header_len;
375  write((char*) &header_len, 4);
376  write((char*) header_buffer.get(), header_len);
377  write((char*) &data_len, 4);
378 
379  // Pad the file header record out
380  if (data_len > 0) {
381  string padding;
382  padding.resize(data_len, ' ');
383  write(padding);
384  }
385 }
386 
389  uint32_t data_size;
390  if (!readHeader(header) || !readDataLength(data_size))
391  throw BagFormatException("Error reading FILE_HEADER record");
392 
393  M_string& fields = *header.getValues();
394 
395  if (!isOp(fields, OP_FILE_HEADER))
396  throw BagFormatException("Expected FILE_HEADER op not found");
397 
398  // Read index position
399  readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
400 
401  if (index_data_pos_ == 0)
402  throw BagUnindexedException();
403 
404  // Read topic and chunks count
405  if (version_ >= 200) {
408  std::string encryptor_plugin_name;
409  readField(fields, ENCRYPTOR_FIELD_NAME, 0, UINT_MAX, false, encryptor_plugin_name);
410  if (!encryptor_plugin_name.empty()) {
411  setEncryptorPlugin(encryptor_plugin_name);
412  encryptor_->readFieldsFromFileHeader(fields);
413  }
414  }
415 
416  CONSOLE_BRIDGE_logDebug("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
417  (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
418 
419  // Skip the data section (just padding)
420  seek(data_size, std::ios::cur);
421 }
422 
423 uint32_t Bag::getChunkOffset() const {
426  else
427  return file_.getCompressedBytesIn();
428 }
429 
431  // Initialize chunk info
434  curr_chunk_info_.end_time = time;
435 
436  // Write the chunk header, with a place-holder for the data sizes (we'll fill in when the chunk is finished)
438 
439  // Turn on compressed writing
441 
442  // Record where the data section of this chunk started
444 
445  chunk_open_ = true;
446 }
447 
449  // Add this chunk to the index
450  chunks_.push_back(curr_chunk_info_);
451 
452  // Get the uncompressed and compressed sizes
453  uint32_t uncompressed_size = getChunkOffset();
455  uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_;
456 
457  // When encryption is on, compressed_size represents encrypted chunk size;
458  // When decrypting, the actual compressed size can be deduced from the decrypted chunk
459  compressed_size = encryptor_->encryptChunk(compressed_size, curr_chunk_data_pos_, file_);
460 
461  // Rewrite the chunk header with the size of the chunk (remembering current offset)
462  uint64_t end_of_chunk_pos = file_.getOffset();
463 
465  writeChunkHeader(compression_, compressed_size, uncompressed_size);
466 
467  // Write out the indexes and clear them
468  seek(end_of_chunk_pos);
471 
472  // Clear the connection counts
474 
475  // Flag that we're starting a new chunk
476  chunk_open_ = false;
477 }
478 
479 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
480  ChunkHeader chunk_header;
481  switch (compression) {
482  case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
483  case compression::BZ2: chunk_header.compression = COMPRESSION_BZ2; break;
484  case compression::LZ4: chunk_header.compression = COMPRESSION_LZ4;
485  //case compression::ZLIB: chunk_header.compression = COMPRESSION_ZLIB; break;
486  }
487  chunk_header.compressed_size = compressed_size;
488  chunk_header.uncompressed_size = uncompressed_size;
489 
490  CONSOLE_BRIDGE_logDebug("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
491  (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
492 
495  header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
498 
499  writeDataLength(chunk_header.compressed_size);
500 }
501 
502 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
504  if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
505  throw BagFormatException("Error reading CHUNK record");
506 
507  M_string& fields = *header.getValues();
508 
509  if (!isOp(fields, OP_CHUNK))
510  throw BagFormatException("Expected CHUNK op not found");
511 
512  readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
513  readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size);
514 
515  CONSOLE_BRIDGE_logDebug("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
516 }
517 
518 // Index records
519 
521  for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
522  uint32_t connection_id = i->first;
523  multiset<IndexEntry> const& index = i->second;
524 
525  // Write the index record header
526  uint32_t index_size = index.size();
529  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
531  header[COUNT_FIELD_NAME] = toHeaderString(&index_size);
533 
534  writeDataLength(index_size * 12);
535 
536  CONSOLE_BRIDGE_logDebug("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
537 
538  // Write the index record data (pairs of timestamp and position in file)
539  for (IndexEntry const& e : index) {
540  write((char*) &e.time.sec, 4);
541  write((char*) &e.time.nsec, 4);
542  write((char*) &e.offset, 4);
543 
544  CONSOLE_BRIDGE_logDebug(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
545  }
546  }
547 }
548 
551  uint32_t data_size;
552  if (!readHeader(header) || !readDataLength(data_size))
553  throw BagFormatException("Error reading INDEX_DATA header");
554  M_string& fields = *header.getValues();
555 
556  if (!isOp(fields, OP_INDEX_DATA))
557  throw BagFormatException("Expected INDEX_DATA record");
558 
559  uint32_t index_version;
560  string topic;
561  uint32_t count = 0;
562  readField(fields, VER_FIELD_NAME, true, &index_version);
563  readField(fields, TOPIC_FIELD_NAME, true, topic);
564  readField(fields, COUNT_FIELD_NAME, true, &count);
565 
566  CONSOLE_BRIDGE_logDebug("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
567 
568  if (index_version != 0)
569  throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
570 
571  uint32_t connection_id;
572  map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
573  if (topic_conn_id_iter == topic_connection_ids_.end()) {
574  connection_id = connections_.size();
575 
576  CONSOLE_BRIDGE_logDebug("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
577  ConnectionInfo* connection_info = new ConnectionInfo();
578  connection_info->id = connection_id;
579  connection_info->topic = topic;
580  connections_[connection_id] = connection_info;
581 
582  topic_connection_ids_[topic] = connection_id;
583  }
584  else
585  connection_id = topic_conn_id_iter->second;
586 
587  multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
588 
589  for (uint32_t i = 0; i < count; i++) {
590  IndexEntry index_entry;
591  uint32_t sec;
592  uint32_t nsec;
593  read((char*) &sec, 4);
594  read((char*) &nsec, 4);
595  read((char*) &index_entry.chunk_pos, 8); //<! store position of the message in the chunk_pos field as it's 64 bits
596  index_entry.time = Time(sec, nsec);
597  index_entry.offset = 0;
598 
599  CONSOLE_BRIDGE_logDebug(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
600 
601  if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
602  {
603  CONSOLE_BRIDGE_logError("Index entry for topic %s contains invalid time.", topic.c_str());
604  } else
605  {
606  connection_index.insert(connection_index.end(), index_entry);
607  }
608  }
609 }
610 
613  uint32_t data_size;
614  if (!readHeader(header) || !readDataLength(data_size))
615  throw BagFormatException("Error reading INDEX_DATA header");
616  M_string& fields = *header.getValues();
617 
618  if (!isOp(fields, OP_INDEX_DATA))
619  throw BagFormatException("Expected INDEX_DATA record");
620 
621  uint32_t index_version;
622  uint32_t connection_id;
623  uint32_t count = 0;
624  readField(fields, VER_FIELD_NAME, true, &index_version);
625  readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
626  readField(fields, COUNT_FIELD_NAME, true, &count);
627 
628  CONSOLE_BRIDGE_logDebug("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
629 
630  if (index_version != 1)
631  throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
632 
633  uint64_t chunk_pos = curr_chunk_info_.pos;
634 
635  multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
636 
637  for (uint32_t i = 0; i < count; i++) {
638  IndexEntry index_entry;
639  index_entry.chunk_pos = chunk_pos;
640  uint32_t sec;
641  uint32_t nsec;
642  read((char*) &sec, 4);
643  read((char*) &nsec, 4);
644  read((char*) &index_entry.offset, 4);
645  index_entry.time = Time(sec, nsec);
646 
647  CONSOLE_BRIDGE_logDebug(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
648 
649  if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
650  {
651  CONSOLE_BRIDGE_logError("Index entry for topic %s contains invalid time. This message will not be loaded.", connections_[connection_id]->topic.c_str());
652  } else
653  {
654  connection_index.insert(connection_index.end(), index_entry);
655  }
656  }
657 }
658 
659 // Connection records
660 
662  for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
663  ConnectionInfo const* connection_info = i->second;
664  writeConnectionRecord(connection_info, true);
665  }
666 }
667 
668 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info, const bool encrypt) {
669  CONSOLE_BRIDGE_logDebug("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
670  (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
671 
674  header[TOPIC_FIELD_NAME] = connection_info->topic;
675  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
676 
677  if (encrypt)
678  encryptor_->writeEncryptedHeader(boost::bind(&Bag::writeHeader, this, boost::placeholders::_1), header, file_);
679  else
681 
682  if (encrypt)
683  encryptor_->writeEncryptedHeader(boost::bind(&Bag::writeHeader, this, boost::placeholders::_1), *connection_info->header, file_);
684  else
685  writeHeader(*connection_info->header);
686 }
687 
688 void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) {
691  header[TOPIC_FIELD_NAME] = connection_info->topic;
692  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
694 
695  appendHeaderToBuffer(buf, *connection_info->header);
696 }
697 
700  if (!encryptor_->readEncryptedHeader(boost::bind(&Bag::readHeader, this, boost::placeholders::_1), header, header_buffer_, file_))
701  throw BagFormatException("Error reading CONNECTION header");
702  M_string& fields = *header.getValues();
703 
704  if (!isOp(fields, OP_CONNECTION))
705  throw BagFormatException("Expected CONNECTION op not found");
706 
707  uint32_t id;
708  readField(fields, CONNECTION_FIELD_NAME, true, &id);
709  string topic;
710  readField(fields, TOPIC_FIELD_NAME, true, topic);
711 
712  ros::Header connection_header;
713  if (!encryptor_->readEncryptedHeader(boost::bind(&Bag::readHeader, this, boost::placeholders::_1), connection_header, header_buffer_, file_))
714  throw BagFormatException("Error reading connection header");
715 
716  // If this is a new connection, update connections
717  map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
718  if (key == connections_.end()) {
719  ConnectionInfo* connection_info = new ConnectionInfo();
720  connection_info->id = id;
721  connection_info->topic = topic;
722  connection_info->header = boost::make_shared<M_string>();
723  for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
724  (*connection_info->header)[i->first] = i->second;
725  connection_info->msg_def = (*connection_info->header)["message_definition"];
726  connection_info->datatype = (*connection_info->header)["type"];
727  connection_info->md5sum = (*connection_info->header)["md5sum"];
728  connections_[id] = connection_info;
729 
730  CONSOLE_BRIDGE_logDebug("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
731  }
732 }
733 
736  uint32_t data_size;
737  if (!readHeader(header) || !readDataLength(data_size))
738  throw BagFormatException("Error reading message definition header");
739  M_string& fields = *header.getValues();
740 
741  if (!isOp(fields, OP_MSG_DEF))
742  throw BagFormatException("Expected MSG_DEF op not found");
743 
744  string topic, md5sum, datatype, message_definition;
745  readField(fields, TOPIC_FIELD_NAME, true, topic);
746  readField(fields, MD5_FIELD_NAME, 32, 32, true, md5sum);
747  readField(fields, TYPE_FIELD_NAME, true, datatype);
748  readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition);
749 
750  ConnectionInfo* connection_info;
751 
752  map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
753  if (topic_conn_id_iter == topic_connection_ids_.end()) {
754  uint32_t id = connections_.size();
755 
756  CONSOLE_BRIDGE_logDebug("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
757  connection_info = new ConnectionInfo();
758  connection_info->id = id;
759  connection_info->topic = topic;
760 
761  connections_[id] = connection_info;
762  topic_connection_ids_[topic] = id;
763  }
764  else
765  connection_info = connections_[topic_conn_id_iter->second];
766 
767  connection_info->msg_def = message_definition;
768  connection_info->datatype = datatype;
769  connection_info->md5sum = md5sum;
770  connection_info->header = boost::make_shared<ros::M_string>();
771  (*connection_info->header)["type"] = connection_info->datatype;
772  (*connection_info->header)["md5sum"] = connection_info->md5sum;
773  (*connection_info->header)["message_definition"] = connection_info->msg_def;
774 
775  CONSOLE_BRIDGE_logDebug("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
776 }
777 
778 void Bag::decompressChunk(uint64_t chunk_pos) const {
779  if (curr_chunk_info_.pos == chunk_pos) {
781  return;
782  }
783 
785 
786  if (decompressed_chunk_ == chunk_pos)
787  return;
788 
789  // Seek to the start of the chunk
790  seek(chunk_pos);
791 
792  // Read the chunk header
793  ChunkHeader chunk_header;
794  readChunkHeader(chunk_header);
795 
796  // Read and decompress the chunk. These assume we are at the right place in the stream already
797  if (chunk_header.compression == COMPRESSION_NONE)
798  decompressRawChunk(chunk_header);
799  else if (chunk_header.compression == COMPRESSION_BZ2)
800  decompressBz2Chunk(chunk_header);
801  else if (chunk_header.compression == COMPRESSION_LZ4)
802  decompressLz4Chunk(chunk_header);
803  else
804  throw BagFormatException("Unknown compression: " + chunk_header.compression);
805 
806  decompressed_chunk_ = chunk_pos;
807 }
808 
809 void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const {
810  CONSOLE_BRIDGE_logDebug("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
811 
812  seek(offset);
813 
814  uint32_t data_size;
815  uint8_t op;
816  do {
817  if (!readHeader(header) || !readDataLength(data_size))
818  throw BagFormatException("Error reading header");
819 
820  readField(*header.getValues(), OP_FIELD_NAME, true, &op);
821  }
822  while (op == OP_MSG_DEF);
823 
824  if (op != OP_MSG_DATA)
825  throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str());
826 
827  record_buffer_.setSize(data_size);
828  file_.read((char*) record_buffer_.getData(), data_size);
829 }
830 
831 // Reading this into a buffer isn't completely necessary, but we do it anyways for now
832 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
833  assert(chunk_header.compression == COMPRESSION_NONE);
834 
835  CONSOLE_BRIDGE_logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
836 
837  encryptor_->decryptChunk(chunk_header, decompress_buffer_, file_);
838 
839  // todo check read was successful
840 }
841 
842 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
843  assert(chunk_header.compression == COMPRESSION_BZ2);
844 
845  CompressionType compression = compression::BZ2;
846 
847  CONSOLE_BRIDGE_logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
848 
849  encryptor_->decryptChunk(chunk_header, chunk_buffer_, file_);
850 
853 
854  // todo check read was successful
855 }
856 
857 void Bag::decompressLz4Chunk(ChunkHeader const& chunk_header) const {
858  assert(chunk_header.compression == COMPRESSION_LZ4);
859 
860  CompressionType compression = compression::LZ4;
861 
862  CONSOLE_BRIDGE_logDebug("lz4 compressed_size: %d uncompressed_size: %d",
863  chunk_header.compressed_size, chunk_header.uncompressed_size);
864 
865  encryptor_->decryptChunk(chunk_header, chunk_buffer_, file_);
866 
869 
870  // todo check read was successful
871 }
872 
875  uint32_t data_size;
876  uint32_t bytes_read;
877  switch (version_)
878  {
879  case 200:
880  decompressChunk(index_entry.chunk_pos);
881  readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
882  return header;
883  case 102:
885  return header;
886  default:
887  throw BagFormatException((format("Unhandled version: %1%") % version_).str());
888  }
889 }
890 
891 // NOTE: this loads the header, which is unnecessary
892 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
894  uint32_t data_size;
895  uint32_t bytes_read;
896  switch (version_)
897  {
898  case 200:
899  decompressChunk(index_entry.chunk_pos);
900  readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
901  return data_size;
902  case 102:
904  return record_buffer_.getSize();
905  default:
906  throw BagFormatException((format("Unhandled version: %1%") % version_).str());
907  }
908 }
909 
911  for (ChunkInfo const& chunk_info : chunks_) {
912  // Write the chunk info header
914  uint32_t chunk_connection_count = chunk_info.connection_counts.size();
917  header[CHUNK_POS_FIELD_NAME] = toHeaderString(&chunk_info.pos);
918  header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
919  header[END_TIME_FIELD_NAME] = toHeaderString(&chunk_info.end_time);
920  header[COUNT_FIELD_NAME] = toHeaderString(&chunk_connection_count);
921 
922  CONSOLE_BRIDGE_logDebug("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
923  (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
924  chunk_info.start_time.sec, chunk_info.start_time.nsec,
925  chunk_info.end_time.sec, chunk_info.end_time.nsec);
926 
928 
929  writeDataLength(8 * chunk_connection_count);
930 
931  // Write the topic names and counts
932  for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
933  uint32_t connection_id = i->first;
934  uint32_t count = i->second;
935 
936  write((char*) &connection_id, 4);
937  write((char*) &count, 4);
938 
939  CONSOLE_BRIDGE_logDebug(" - %d: %d", connection_id, count);
940  }
941  }
942 }
943 
945  // Read a CHUNK_INFO header
947  uint32_t data_size;
948  if (!readHeader(header) || !readDataLength(data_size))
949  throw BagFormatException("Error reading CHUNK_INFO record header");
950  M_string& fields = *header.getValues();
951  if (!isOp(fields, OP_CHUNK_INFO))
952  throw BagFormatException("Expected CHUNK_INFO op not found");
953 
954  // Check that the chunk info version is current
955  uint32_t chunk_info_version;
956  readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
957  if (chunk_info_version != CHUNK_INFO_VERSION)
958  throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str());
959 
960  // Read the chunk position, timestamp, and topic count fields
961  ChunkInfo chunk_info;
962  readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos);
963  readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time);
964  readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time);
965  uint32_t chunk_connection_count = 0;
966  readField(fields, COUNT_FIELD_NAME, true, &chunk_connection_count);
967 
968  CONSOLE_BRIDGE_logDebug("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
969  (unsigned long long) chunk_info.pos, chunk_connection_count,
970  chunk_info.start_time.sec, chunk_info.start_time.nsec,
971  chunk_info.end_time.sec, chunk_info.end_time.nsec);
972 
973  // Read the topic count entries
974  for (uint32_t i = 0; i < chunk_connection_count; i ++) {
975  uint32_t connection_id, connection_count;
976  read((char*) &connection_id, 4);
977  read((char*) &connection_count, 4);
978 
979  CONSOLE_BRIDGE_logDebug(" %d: %d messages", connection_id, connection_count);
980 
981  chunk_info.connection_counts[connection_id] = connection_count;
982  }
983 
984  chunks_.push_back(chunk_info);
985 }
986 
987 // Record I/O
988 
989 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
990  uint8_t op = 0xFF; // nonexistent op
991  readField(fields, OP_FIELD_NAME, true, &op);
992  return op == reqOp;
993 }
994 
995 void Bag::writeHeader(M_string const& fields) {
996  boost::shared_array<uint8_t> header_buffer;
997  uint32_t header_len;
998  ros::Header::write(fields, header_buffer, header_len);
999  write((char*) &header_len, 4);
1000  write((char*) header_buffer.get(), header_len);
1001 }
1002 
1003 void Bag::writeDataLength(uint32_t data_len) {
1004  write((char*) &data_len, 4);
1005 }
1006 
1007 void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) {
1008  boost::shared_array<uint8_t> header_buffer;
1009  uint32_t header_len;
1010  ros::Header::write(fields, header_buffer, header_len);
1011 
1012  uint32_t offset = buf.getSize();
1013 
1014  buf.setSize(buf.getSize() + 4 + header_len);
1015 
1016  memcpy(buf.getData() + offset, &header_len, 4);
1017  offset += 4;
1018  memcpy(buf.getData() + offset, header_buffer.get(), header_len);
1019 }
1020 
1021 void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) {
1022  uint32_t offset = buf.getSize();
1023 
1024  buf.setSize(buf.getSize() + 4);
1025 
1026  memcpy(buf.getData() + offset, &data_len, 4);
1027 }
1028 
1030 void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const {
1031  assert(buffer.getSize() > 8);
1032 
1033  uint8_t* start = (uint8_t*) buffer.getData() + offset;
1034 
1035  uint8_t* ptr = start;
1036 
1037  // Read the header length
1038  uint32_t header_len;
1039  memcpy(&header_len, ptr, 4);
1040  ptr += 4;
1041 
1042  // Parse the header
1043  string error_msg;
1044  bool parsed = header.parse(ptr, header_len, error_msg);
1045  if (!parsed)
1046  throw BagFormatException("Error parsing header");
1047  ptr += header_len;
1048 
1049  // Read the data size
1050  memcpy(&data_size, ptr, 4);
1051  ptr += 4;
1052 
1053  bytes_read = ptr - start;
1054 }
1055 
1056 void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) const {
1057  (void)buffer;
1058  total_bytes_read = 0;
1059  uint8_t op = 0xFF;
1060  do {
1061  CONSOLE_BRIDGE_logDebug("reading header from buffer: offset=%d", offset);
1062  uint32_t bytes_read;
1063  readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
1064 
1065  offset += bytes_read;
1066  total_bytes_read += bytes_read;
1067 
1068  readField(*header.getValues(), OP_FIELD_NAME, true, &op);
1069  }
1070  while (op == OP_MSG_DEF || op == OP_CONNECTION);
1071 
1072  if (op != OP_MSG_DATA)
1073  throw BagFormatException("Expected MSG_DATA op not found");
1074 }
1075 
1076 bool Bag::readHeader(ros::Header& header) const {
1077  // Read the header length
1078  uint32_t header_len;
1079  read((char*) &header_len, 4);
1080 
1081  // Read the header
1082  header_buffer_.setSize(header_len);
1083  read((char*) header_buffer_.getData(), header_len);
1084 
1085  // Parse the header
1086  string error_msg;
1087  bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
1088  if (!parsed)
1089  return false;
1090 
1091  return true;
1092 }
1093 
1094 bool Bag::readDataLength(uint32_t& data_size) const {
1095  read((char*) &data_size, 4);
1096  return true;
1097 }
1098 
1099 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
1100  M_string::const_iterator fitr = fields.find(field);
1101  if (fitr == fields.end()) {
1102  if (required)
1103  throw BagFormatException("Required '" + field + "' field missing");
1104  }
1105  else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
1106  throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
1107 
1108  return fitr;
1109 }
1110 
1111 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
1112  return readField(fields, field_name, 1, UINT_MAX, required, data);
1113 }
1114 
1115 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
1116  M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
1117  if (fitr == fields.end())
1118  return false;
1119 
1120  data = fitr->second;
1121  return true;
1122 }
1123 
1124 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
1125  uint64_t packed_time;
1126  if (!readField(fields, field_name, required, &packed_time))
1127  return false;
1128 
1129  uint64_t bitmask = (1LL << 33) - 1;
1130  data.sec = (uint32_t) (packed_time & bitmask);
1131  data.nsec = (uint32_t) (packed_time >> 32);
1132 
1133  return true;
1134 }
1135 
1136 std::string Bag::toHeaderString(Time const* field) const {
1137  uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
1138  return toHeaderString(&packed_time);
1139 }
1140 
1141 
1142 // Low-level I/O
1143 
1144 void Bag::write(string const& s) { write(s.c_str(), s.length()); }
1145 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n); }
1146 
1147 void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); }
1148 void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); }
1149 
1150 void Bag::swap(Bag& other) {
1151  using std::swap;
1152  swap(mode_, other.mode_);
1153  swap(file_, other.file_);
1154  swap(version_, other.version_);
1155  swap(compression_, other.compression_);
1158  swap(file_size_, other.file_size_);
1162  swap(chunk_count_, other.chunk_count_);
1163  swap(chunk_open_, other.chunk_open_);
1168  swap(connections_, other.connections_);
1169  swap(chunks_, other.chunks_);
1179  swap(encryptor_, other.encryptor_);
1180 }
1181 
1182 bool Bag::isOpen() const { return file_.isOpen(); }
1183 
1184 } // namespace rosbag
1185 
rosbag::Bag::chunks_
std::vector< ChunkInfo > chunks_
Definition: bag.h:369
rosbag::ChunkedFile::getFileName
std::string getFileName() const
return path of currently open file
Definition: chunked_file.cpp:135
rosbag::BagFormatException
Exception thrown on problems reading the bag format.
Definition: exceptions.h:89
rosbag::Bag::getMode
BagMode getMode() const
Get the mode the bag is in.
Definition: bag.cpp:191
rosbag::Buffer::getData
uint8_t * getData()
Definition: buffer.cpp:84
rosbag::ChunkedFile::setWriteMode
void setWriteMode(CompressionType type)
Definition: chunked_file.cpp:159
rosbag::Bag::topic_connection_ids_
std::map< std::string, uint32_t > topic_connection_ids_
Definition: bag.h:365
rosbag::Bag::file_
ChunkedFile file_
Definition: bag.h:348
rosbag::END_TIME_FIELD_NAME
static const std::string END_TIME_FIELD_NAME
Definition: constants.h:94
rosbag::Bag::getCompression
CompressionType getCompression() const
Get the compression method to use for writing chunks.
Definition: bag.cpp:203
rosbag::Bag::connection_count_
uint32_t connection_count_
Definition: bag.h:357
rosbag::Bag::read
void read(char *b, std::streamsize n) const
Definition: bag.cpp:1147
md5sum
const char * md5sum()
rosbag::Bag
Definition: bag.h:131
rosbag::Bag::startWritingChunk
void startWritingChunk(ros::Time time)
Definition: bag.cpp:430
rosbag::Bag::writeHeader
void writeHeader(ros::M_string const &fields)
Definition: bag.cpp:995
rosbag::COMPRESSION_NONE
static const std::string COMPRESSION_NONE
Definition: constants.h:128
rosbag::OP_CHUNK_INFO
static const unsigned char OP_CHUNK_INFO
Definition: constants.h:112
rosbag::Bag::readMessageDataSize
uint32_t readMessageDataSize(IndexEntry const &index_entry) const
Definition: bag.cpp:892
rosbag::Bag::decompressed_chunk_
uint64_t decompressed_chunk_
position of decompressed chunk
Definition: bag.h:384
rosbag::Bag::decompressBz2Chunk
void decompressBz2Chunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:842
rosbag::Bag::getMajorVersion
uint32_t getMajorVersion() const
Get the major-version of the open bag file.
Definition: bag.cpp:258
rosbag::ChunkedFile::seek
void seek(uint64_t offset, int origin=std::ios_base::beg)
seek to given offset from origin
Definition: chunked_file.cpp:183
rosbag::Buffer::setSize
void setSize(uint32_t size)
Definition: buffer.cpp:88
boost::shared_ptr
rosbag::FILE_HEADER_LENGTH
static const uint32_t FILE_HEADER_LENGTH
Definition: constants.h:119
s
XmlRpcServer s
rosbag::Bag::readDataLength
bool readDataLength(uint32_t &data_size) const
Definition: bag.cpp:1094
rosbag::Bag::init
void init()
Definition: bag.cpp:87
rosbag::Bag::mode_
BagMode mode_
Definition: bag.h:347
rosbag::Bag::header_buffer_
Buffer header_buffer_
reusable buffer in which to assemble the record header before writing to file
Definition: bag.h:374
rosbag::ChunkedFile::openReadWrite
void openReadWrite(std::string const &filename)
open file for reading & writing
Definition: chunked_file.cpp:78
ros::Header
rosbag::Bag::encryptor_
boost::shared_ptr< rosbag::EncryptorBase > encryptor_
Definition: bag.h:389
rosbag::Bag::setEncryptorPlugin
void setEncryptorPlugin(const std::string &plugin_name, const std::string &plugin_param=std::string())
Set encryptor of the bag file.
Definition: bag.cpp:219
rosbag::OP_MSG_DATA
static const unsigned char OP_MSG_DATA
Definition: constants.h:108
CONSOLE_BRIDGE_logError
#define CONSOLE_BRIDGE_logError(fmt,...)
rosbag::Bag::curr_chunk_info_
ChunkInfo curr_chunk_info_
Definition: bag.h:362
rosbag::INDEX_VERSION
static const uint32_t INDEX_VERSION
Definition: constants.h:122
rosbag::Bag::decompressChunk
void decompressChunk(uint64_t chunk_pos) const
Definition: bag.cpp:778
rosbag::ConnectionInfo::header
boost::shared_ptr< ros::M_string > header
Definition: structures.h:121
rosbag::ChunkHeader::compressed_size
uint32_t compressed_size
compressed size of the chunk in bytes
Definition: structures.h:104
rosbag::ChunkHeader::uncompressed_size
uint32_t uncompressed_size
uncompressed size of the chunk in bytes
Definition: structures.h:105
rosbag::Bag::toHeaderString
std::string toHeaderString(T const *field) const
Definition: bag.h:389
rosbag::Buffer
Definition: buffer.h:75
rosbag::Bag::isOp
bool isOp(ros::M_string &fields, uint8_t reqOp) const
Definition: bag.cpp:989
rosbag::compression::LZ4
@ LZ4
Definition: stream.h:155
rosbag::ENCRYPTOR_FIELD_NAME
static const std::string ENCRYPTOR_FIELD_NAME
Definition: constants.h:96
rosbag::CHUNK_INFO_VERSION
static const uint32_t CHUNK_INFO_VERSION
Definition: constants.h:125
rosbag::Bag::seek
void seek(uint64_t pos, int origin=std::ios_base::beg) const
Definition: bag.cpp:1148
rosbag::Bag::getChunkThreshold
uint32_t getChunkThreshold() const
Get the threshold for creating new chunks.
Definition: bag.cpp:194
rosbag::ConnectionInfo::topic
std::string topic
Definition: structures.h:116
rosbag::ChunkInfo::start_time
ros::Time start_time
earliest timestamp of a message in the chunk
Definition: structures.h:94
rosbag::ConnectionInfo::md5sum
std::string md5sum
Definition: structures.h:118
rosbag::swap
void swap(Bag &a, Bag &b)
Definition: bag.h:674
rosbag::ConnectionInfo
Definition: structures.h:79
query.h
rosbag::Bag::close
void close()
Close the bag file.
Definition: bag.cpp:165
message_instance.h
rosbag::Bag::writeConnectionRecord
void writeConnectionRecord(ConnectionInfo const *connection_info, const bool encrypt)
Definition: bag.cpp:668
rosbag::Bag::writeChunkInfoRecords
void writeChunkInfoRecords()
Definition: bag.cpp:910
rosbag::compression::BZ2
@ BZ2
Definition: stream.h:154
rosbag::Bag::connections_
std::map< uint32_t, ConnectionInfo * > connections_
Definition: bag.h:367
rosbag::Bag::readChunkInfoRecord
void readChunkInfoRecord()
Definition: bag.cpp:944
rosbag::Bag::decompressLz4Chunk
void decompressLz4Chunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:857
rosbag::Bag::stopWritingChunk
void stopWritingChunk()
Definition: bag.cpp:448
ros::Header::write
static void write(const M_string &key_vals, boost::shared_array< uint8_t > &buffer, uint32_t &size)
rosbag::IndexEntry::offset
uint32_t offset
relative byte offset of the message record (either definition or data) in the chunk
Definition: structures.h:112
rosbag::bagmode::Read
@ Read
Definition: bag.h:185
rosbag::Bag::~Bag
~Bag()
Definition: bag.cpp:83
rosbag::bagmode::BagMode
BagMode
The possible modes to open a bag in.
Definition: bag.h:150
rosbag::ChunkedFile::getOffset
uint64_t getOffset() const
return current offset from the beginning of the file
Definition: chunked_file.cpp:196
rosbag::BagException
Base class for rosbag exceptions.
Definition: exceptions.h:75
rosbag::Bag::chunk_buffer_
Buffer chunk_buffer_
reusable buffer to read chunk into
Definition: bag.h:377
rosbag::OP_FILE_HEADER
static const unsigned char OP_FILE_HEADER
Definition: constants.h:109
rosbag::OP_CHUNK
static const unsigned char OP_CHUNK
Definition: constants.h:111
rosbag::Bag::checkField
ros::M_string::const_iterator checkField(ros::M_string const &fields, std::string const &field, unsigned int min_len, unsigned int max_len, bool required) const
Definition: bag.cpp:1099
rosbag::Bag::writeConnectionRecords
void writeConnectionRecords()
Definition: bag.cpp:661
rosbag::Bag::startReadingVersion200
void startReadingVersion200()
Definition: bag.cpp:283
rosbag::Bag::index_data_pos_
uint64_t index_data_pos_
Definition: bag.h:356
rosbag::Bag::getMinorVersion
uint32_t getMinorVersion() const
Get the minor-version of the open bag file.
Definition: bag.cpp:259
rosbag::IndexEntry::chunk_pos
uint64_t chunk_pos
absolute byte offset of the chunk record containing the message
Definition: structures.h:111
rosbag::bagmode::Append
@ Append
Definition: bag.h:186
rosbag::Bag::connection_indexes_
std::map< uint32_t, std::multiset< IndexEntry > > connection_indexes_
Definition: bag.h:371
rosbag::ChunkedFile::isOpen
bool isOpen() const
return true if file is open for reading or writing
Definition: chunked_file.cpp:134
rosbag::ChunkedFile::getline
std::string getline()
Definition: chunked_file.cpp:209
rosbag::Bag::appendConnectionRecordToBuffer
void appendConnectionRecordToBuffer(Buffer &buf, ConnectionInfo const *connection_info)
Definition: bag.cpp:688
rosbag::BagIOException
Exception thrown when on IO problems.
Definition: exceptions.h:82
rosbag::ChunkInfo::connection_counts
std::map< uint32_t, uint32_t > connection_counts
number of messages in each connection stored in the chunk
Definition: structures.h:98
rosbag::Bag::getSize
uint64_t getSize() const
Get the current size of the bag file (a lower bound)
Definition: bag.cpp:192
rosbag::CONNECTION_FIELD_NAME
static const std::string CONNECTION_FIELD_NAME
Definition: constants.h:89
rosbag::ChunkHeader
Definition: structures.h:101
rosbag::Bag::appendDataLengthToBuffer
void appendDataLengthToBuffer(Buffer &buf, uint32_t data_len)
Definition: bag.cpp:1021
rosbag::Bag::stopWriting
void stopWriting()
Definition: bag.cpp:269
rosbag::Bag::getChunkOffset
uint32_t getChunkOffset() const
Definition: bag.cpp:423
rosbag::MD5_FIELD_NAME
static const std::string MD5_FIELD_NAME
Definition: constants.h:99
rosbag::INDEX_POS_FIELD_NAME
static const std::string INDEX_POS_FIELD_NAME
Definition: constants.h:86
rosbag::Bag::setCompression
void setCompression(CompressionType compression)
Set the compression method to use for writing chunks.
Definition: bag.cpp:205
rosbag::Bag::readField
bool readField(ros::M_string const &fields, std::string const &field_name, bool required, T *data) const
Definition: bag.h:394
rosbag::Bag::file_header_pos_
uint64_t file_header_pos_
Definition: bag.h:355
rosbag::Bag::operator=
Bag & operator=(Bag &&other)
Definition: bag.cpp:76
rosbag::START_TIME_FIELD_NAME
static const std::string START_TIME_FIELD_NAME
Definition: constants.h:93
ros::TIME_MIN
const Time TIME_MIN(0, 1)
rosbag::Bag::getFileName
std::string getFileName() const
Get the filename of the bag.
Definition: bag.cpp:190
rosbag::Bag::isOpen
bool isOpen() const
Definition: bag.cpp:1182
rosbag::ChunkedFile::getCompressedBytesIn
uint32_t getCompressedBytesIn() const
return the number of bytes written to current compressed stream
Definition: chunked_file.cpp:197
rosbag::Bag::swap
void swap(Bag &)
Definition: bag.cpp:1150
rosbag::Bag::readHeader
bool readHeader(ros::Header &header) const
Definition: bag.cpp:1076
rosbag::ConnectionInfo::datatype
std::string datatype
Definition: structures.h:117
boost::shared_array< uint8_t >
rosbag::TOPIC_FIELD_NAME
static const std::string TOPIC_FIELD_NAME
Definition: constants.h:83
rosbag::bagmode::Write
@ Write
Definition: bag.h:184
rosbag::Bag::appendHeaderToBuffer
void appendHeaderToBuffer(Buffer &buf, ros::M_string const &fields)
Definition: bag.cpp:1007
rosbag::ChunkedFile::close
void close()
close the file
Definition: chunked_file.cpp:137
rosbag::ChunkedFile::openWrite
void openWrite(std::string const &filename)
open file for writing
Definition: chunked_file.cpp:79
rosbag::Buffer::getSize
uint32_t getSize() const
Definition: buffer.cpp:86
rosbag::Bag::closeWrite
void closeWrite()
Definition: bag.cpp:186
rosbag::Bag::file_size_
uint64_t file_size_
Definition: bag.h:354
rosbag::BagMode
bagmode::BagMode BagMode
Definition: bag.h:125
rosbag::Bag::header_connection_ids_
std::map< ros::M_string, uint32_t > header_connection_ids_
Definition: bag.h:366
TimeBase< Time, Duration >::sec
uint32_t sec
bag.h
rosbag::ChunkedFile::decompress
void decompress(CompressionType compression, uint8_t *dest, unsigned int dest_len, uint8_t *source, unsigned int source_len)
Definition: chunked_file.cpp:221
rosbag::DEF_FIELD_NAME
static const std::string DEF_FIELD_NAME
Definition: constants.h:101
TimeBase< Time, Duration >::nsec
uint32_t nsec
rosbag::IndexEntry
Definition: structures.h:108
rosbag::Bag::current_buffer_
Buffer * current_buffer_
Definition: bag.h:382
ros::Header::getValues
M_stringPtr getValues()
rosbag::Bag::writeDataLength
void writeDataLength(uint32_t data_len)
Definition: bag.cpp:1003
rosbag::Bag::startWriting
void startWriting()
Definition: bag.cpp:263
rosbag::Bag::readMessageDefinitionRecord102
void readMessageDefinitionRecord102()
Definition: bag.cpp:734
rosbag::Bag::writeChunkHeader
void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size)
Definition: bag.cpp:479
rosbag::CONNECTION_COUNT_FIELD_NAME
static const std::string CONNECTION_COUNT_FIELD_NAME
Definition: constants.h:87
rosbag::ConnectionInfo::msg_def
std::string msg_def
Definition: structures.h:119
rosbag::Bag::readMessageDataRecord102
void readMessageDataRecord102(uint64_t offset, ros::Header &header) const
Definition: bag.cpp:809
rosbag::compression::CompressionType
CompressionType
Definition: stream.h:119
start
ROSCPP_DECL void start()
rosbag::Bag::writeIndexRecords
void writeIndexRecords()
Definition: bag.cpp:520
rosbag::Bag::write
void write(std::string const &topic, ros::MessageEvent< T > const &event)
Write a message into the bag file.
Definition: bag.h:369
rosbag::Bag::curr_chunk_connection_indexes_
std::map< uint32_t, std::multiset< IndexEntry > > curr_chunk_connection_indexes_
Definition: bag.h:372
ros::TIME_MAX
const ROSTIME_DECL Time TIME_MAX
rosbag::Bag::readTopicIndexRecord102
void readTopicIndexRecord102()
Definition: bag.cpp:549
pluginlib::ClassLoader::createInstance
boost::shared_ptr< T > createInstance(const std::string &lookup_name)
ros::Time
rosbag::ChunkInfo
Definition: structures.h:92
rosbag::Bag::chunk_open_
bool chunk_open_
Definition: bag.h:361
rosbag::VERSION
static const std::string VERSION
Definition: constants.h:76
rosbag::Bag::version_
int version_
Definition: bag.h:349
rosbag::Bag::startReadingVersion102
void startReadingVersion102()
Definition: bag.cpp:318
rosbag::Bag::openAppend
void openAppend(std::string const &filename)
Definition: bag.cpp:143
rosbag::Bag::openRead
void openRead(std::string const &filename)
Definition: bag.cpp:124
rosbag::compression::Uncompressed
@ Uncompressed
Definition: stream.h:153
rosbag::CHUNK_POS_FIELD_NAME
static const std::string CHUNK_POS_FIELD_NAME
Definition: constants.h:95
rosbag::CHUNK_COUNT_FIELD_NAME
static const std::string CHUNK_COUNT_FIELD_NAME
Definition: constants.h:88
view.h
datatype
const char * datatype()
rosbag::Bag::readFileHeaderRecord
void readFileHeaderRecord()
Definition: bag.cpp:387
rosbag::Bag::decompressRawChunk
void decompressRawChunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:832
rosbag::ConnectionInfo::id
uint32_t id
Definition: structures.h:115
rosbag::Bag::readMessageDataHeader
ros::Header readMessageDataHeader(IndexEntry const &index_entry)
Definition: bag.cpp:873
rosbag::ChunkedFile::openRead
void openRead(std::string const &filename)
open file for reading
Definition: chunked_file.cpp:80
rosbag::TYPE_FIELD_NAME
static const std::string TYPE_FIELD_NAME
Definition: constants.h:100
rosbag
Definition: aes_encryptor.h:43
rosbag::VER_FIELD_NAME
static const std::string VER_FIELD_NAME
Definition: constants.h:84
rosbag::COMPRESSION_LZ4
static const std::string COMPRESSION_LZ4
Definition: constants.h:130
rosbag::Bag::outgoing_chunk_buffer_
Buffer outgoing_chunk_buffer_
reusable buffer to read chunk into
Definition: bag.h:380
rosbag::Bag::readChunkHeader
void readChunkHeader(ChunkHeader &chunk_header) const
Definition: bag.cpp:502
rosbag::IndexEntry::time
ros::Time time
timestamp of the message
Definition: structures.h:110
rosbag::COUNT_FIELD_NAME
static const std::string COUNT_FIELD_NAME
Definition: constants.h:85
rosbag::Bag::open
void open(std::string const &filename, uint32_t mode=bagmode::Read)
Open a bag file.
Definition: bag.cpp:105
header
const std::string header
rosbag::OP_MSG_DEF
static const unsigned char OP_MSG_DEF
Definition: constants.h:116
rosbag::Bag::chunk_count_
uint32_t chunk_count_
Definition: bag.h:358
rosbag::ChunkedFile::read
void read(void *ptr, size_t size)
read size bytes from the file into ptr
Definition: chunked_file.cpp:201
assert.h
rosbag::Bag::readHeaderFromBuffer
void readHeaderFromBuffer(Buffer &buffer, uint32_t offset, ros::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
Definition: bag.cpp:1030
rosbag::Bag::compression_
CompressionType compression_
Definition: bag.h:350
rosbag::Bag::setChunkThreshold
void setChunkThreshold(uint32_t chunk_threshold)
Set the threshold for creating new chunks.
Definition: bag.cpp:196
rosbag::Bag::readVersion
void readVersion()
Definition: bag.cpp:239
rosbag::OP_FIELD_NAME
static const std::string OP_FIELD_NAME
Definition: constants.h:82
rosbag::Bag::bag_revision_
uint32_t bag_revision_
Definition: bag.h:352
rosbag::ChunkedFile::write
void write(std::string const &s)
rosbag::Bag::curr_chunk_data_pos_
uint64_t curr_chunk_data_pos_
Definition: bag.h:363
rosbag::BagUnindexedException
Exception thrown on problems reading the bag index.
Definition: exceptions.h:96
rosbag::COMPRESSION_FIELD_NAME
static const std::string COMPRESSION_FIELD_NAME
Definition: constants.h:90
rosbag::SIZE_FIELD_NAME
static const std::string SIZE_FIELD_NAME
Definition: constants.h:91
rosbag::Bag::readConnectionIndexRecord200
void readConnectionIndexRecord200()
Definition: bag.cpp:611
rosbag::ChunkInfo::pos
uint64_t pos
absolute byte offset of chunk record in bag file
Definition: structures.h:96
rosbag::ChunkedFile::truncate
bool truncate(uint64_t length)
Definition: chunked_file.cpp:203
rosbag::Bag::readMessageDataHeaderFromBuffer
void readMessageDataHeaderFromBuffer(Buffer &buffer, uint32_t offset, ros::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
Definition: bag.cpp:1056
rosbag::Bag::decompress_buffer_
Buffer decompress_buffer_
reusable buffer to decompress chunks into
Definition: bag.h:378
rosbag::Bag::record_buffer_
Buffer record_buffer_
reusable buffer in which to assemble the record data before writing to file
Definition: bag.h:375
rosbag::Bag::writeVersion
void writeVersion()
Definition: bag.cpp:229
rosbag::OP_CONNECTION
static const unsigned char OP_CONNECTION
Definition: constants.h:113
rosbag::Bag::Bag
Bag()
Definition: bag.cpp:58
rosbag::COMPRESSION_BZ2
static const std::string COMPRESSION_BZ2
Definition: constants.h:129
rosbag::ChunkInfo::end_time
ros::Time end_time
latest timestamp of a message in the chunk
Definition: structures.h:95
rosbag::Bag::chunk_threshold_
uint32_t chunk_threshold_
Definition: bag.h:351
ros::M_string
std::map< std::string, std::string > M_string
rosbag::OP_INDEX_DATA
static const unsigned char OP_INDEX_DATA
Definition: constants.h:110
rosbag::Bag::writeFileHeaderRecord
void writeFileHeaderRecord()
Definition: bag.cpp:354
rosbag::Bag::encryptor_loader_
pluginlib::ClassLoader< rosbag::EncryptorBase > encryptor_loader_
Definition: bag.h:387
rosbag::Bag::openWrite
void openWrite(std::string const &filename)
Definition: bag.cpp:137
rosbag::Bag::readConnectionRecord
void readConnectionRecord()
Definition: bag.cpp:698
rosbag::ChunkHeader::compression
std::string compression
chunk compression type, e.g. "none" or "bz2" (see constants.h)
Definition: structures.h:103


rosbag_storage
Author(s): Dirk Thomas , Jacob Perron
autogenerated on Thu Nov 23 2023 04:01:58