bag.cpp
Go to the documentation of this file.
1 // Copyright (c) 2009, Willow Garage, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 //
7 // * Redistributions of source code must retain the above copyright
8 // notice, this list of conditions and the following disclaimer.
9 // * Redistributions in binary form must reproduce the above copyright
10 // notice, this list of conditions and the following disclaimer in the
11 // documentation and/or other materials provided with the distribution.
12 // * Neither the name of Willow Garage, Inc. nor the names of its
13 // contributors may be used to endorse or promote products derived from
14 // this software without specific prior written permission.
15 //
16 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 // POSSIBILITY OF SUCH DAMAGE.
27 
28 #include "rosbag/bag.h"
30 #include "rosbag/query.h"
31 #include "rosbag/view.h"
32 
33 #if defined(_MSC_VER)
34  #include <stdint.h> // only on v2010 and later -> is this enough for msvc and linux?
35 #else
36  #include <inttypes.h>
37 #endif
38 #include <signal.h>
39 #include <assert.h>
40 #include <iomanip>
41 
42 #include "console_bridge/console.h"
43 
44 using std::map;
45 using std::priority_queue;
46 using std::string;
47 using std::vector;
48 using std::multiset;
49 using boost::format;
50 using boost::shared_ptr;
51 using ros::M_string;
52 using ros::Time;
53 
54 namespace rosbag {
55 
56 Bag::Bag() : encryptor_loader_("rosbag_storage", "rosbag::EncryptorBase")
57 {
58  init();
59 }
60 
61 Bag::Bag(string const& filename, uint32_t mode) : encryptor_loader_("rosbag_storage", "rosbag::EncryptorBase")
62 {
63  init();
64  open(filename, mode);
65 }
66 
67 #ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
68 
69 Bag::Bag(Bag&& other) : encryptor_loader_("rosbag_storage", "rosbag::EncryptorBase") {
70  init();
71  swap(other);
72 }
73 
74 Bag& Bag::operator=(Bag&& other) {
75  swap(other);
76  return *this;
77 }
78 
79 #endif // BOOST_NO_CXX11_RVALUE_REFERENCES
80 
82  close();
83 }
84 
85 void Bag::init() {
87  version_ = 0;
89  chunk_threshold_ = 768 * 1024; // 768KB chunks
90  bag_revision_ = 0;
91  file_size_ = 0;
92  file_header_pos_ = 0;
93  index_data_pos_ = 0;
95  chunk_count_ = 0;
96  chunk_open_ = false;
98  current_buffer_ = 0;
100  setEncryptorPlugin(std::string("rosbag/NoEncryptor"));
101 }
102 
103 void Bag::open(string const& filename, uint32_t mode) {
104  mode_ = (BagMode) mode;
105 
106  if (mode_ & bagmode::Append)
107  openAppend(filename);
108  else if (mode_ & bagmode::Write)
109  openWrite(filename);
110  else if (mode_ & bagmode::Read)
111  openRead(filename);
112  else
113  throw BagException((format("Unknown mode: %1%") % (int) mode).str());
114 
115  // Determine file size
116  uint64_t offset = file_.getOffset();
117  seek(0, std::ios::end);
119  seek(offset);
120 }
121 
122 void Bag::openRead(string const& filename) {
123  file_.openRead(filename);
124 
125  readVersion();
126 
127  switch (version_) {
128  case 102: startReadingVersion102(); break;
129  case 200: startReadingVersion200(); break;
130  default:
131  throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str());
132  }
133 }
134 
135 void Bag::openWrite(string const& filename) {
136  file_.openWrite(filename);
137 
138  startWriting();
139 }
140 
141 void Bag::openAppend(string const& filename) {
142  file_.openReadWrite(filename);
143 
144  readVersion();
145 
146  if (version_ != 200)
147  throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str());
148 
150 
151  // Truncate the file to chop off the index
153  index_data_pos_ = 0;
154 
155  // Rewrite the file header, clearing the index position (so we know if the index is invalid)
158 
159  // Seek to the end of the file
160  seek(0, std::ios::end);
161 }
162 
163 void Bag::close() {
164  if (!isOpen())
165  return;
166 
168  closeWrite();
169 
170  file_.close();
171 
172  topic_connection_ids_.clear();
173  header_connection_ids_.clear();
174  for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
175  delete i->second;
176  connections_.clear();
177  chunks_.clear();
178  connection_indexes_.clear();
180 
181  init();
182 }
183 
185  stopWriting();
186 }
187 
188 string Bag::getFileName() const { return file_.getFileName(); }
189 BagMode Bag::getMode() const { return mode_; }
190 uint64_t Bag::getSize() const { return file_size_; }
191 
192 uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; }
193 
194 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
195  if (isOpen() && chunk_open_)
197 
198  chunk_threshold_ = chunk_threshold;
199 }
200 
202 
204  if (isOpen() && chunk_open_)
206 
207  if (!(compression == compression::Uncompressed ||
208  compression == compression::BZ2 ||
209  compression == compression::LZ4)) {
210  throw BagException(
211  (format("Unknown compression type: %i") % compression).str());
212  }
213 
214  compression_ = compression;
215 }
216 
217 void Bag::setEncryptorPlugin(std::string const& plugin_name, std::string const& plugin_param) {
218  if (!chunks_.empty()) {
219  throw BagException("Cannot set encryption plugin after chunks are written");
220  }
222  encryptor_->initialize(*this, plugin_param);
223 }
224 
225 // Version
226 
228  string version = string("#ROSBAG V") + VERSION + string("\n");
229 
230  CONSOLE_BRIDGE_logDebug("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
231 
232  version_ = 200;
233 
234  write(version);
235 }
236 
238  string version_line = file_.getline();
239 
241 
242  char logtypename[100];
243  int version_major, version_minor;
244 #if defined(_MSC_VER)
245  if (sscanf_s(version_line.c_str(), "#ROS%s V%d.%d", logtypename, sizeof(logtypename), &version_major, &version_minor) != 3)
246 #else
247  if (sscanf(version_line.c_str(), "#ROS%99s V%d.%d", logtypename, &version_major, &version_minor) != 3)
248 #endif
249  throw BagIOException("Error reading version line");
250 
251  version_ = version_major * 100 + version_minor;
252 
253  CONSOLE_BRIDGE_logDebug("Read VERSION: version=%d", version_);
254 }
255 
256 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
257 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
258 
259 //
260 
262  writeVersion();
265 }
266 
268  if (chunk_open_)
270 
271  seek(0, std::ios::end);
272 
276 
279 }
280 
282  // Read the file header record, which points to the end of the chunks
284 
285  // Seek to the end of the chunks
287 
288  // Read the connection records (one for each connection)
289  for (uint32_t i = 0; i < connection_count_; i++)
291 
292  // Read the chunk info records
293  for (uint32_t i = 0; i < chunk_count_; i++)
295 
296  // Read the connection indexes for each chunk
297  for (ChunkInfo const& chunk_info : chunks_) {
298  curr_chunk_info_ = chunk_info;
299 
301 
302  // Skip over the chunk data
303  ChunkHeader chunk_header;
304  readChunkHeader(chunk_header);
305  seek(chunk_header.compressed_size, std::ios::cur);
306 
307  // Read the index records after the chunk
308  for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
310  }
311 
312  // At this point we don't have a curr_chunk_info anymore so we reset it
314 }
315 
317  try
318  {
319  // Read the file header record, which points to the start of the topic indexes
321  }
322  catch (const BagFormatException& ex) {
323  throw BagUnindexedException();
324  }
325 
326  // Get the length of the file
327  seek(0, std::ios::end);
328  uint64_t filelength = file_.getOffset();
329 
330  // Seek to the beginning of the topic index records
332 
333  // Read the topic index records, which point to the offsets of each message in the file
334  while (file_.getOffset() < filelength)
336 
337  // Read the message definition records (which are the first entry in the topic indexes)
338  for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
339  multiset<IndexEntry> const& index = i->second;
340  IndexEntry const& first_entry = *index.begin();
341 
342  CONSOLE_BRIDGE_logDebug("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
343 
344  seek(first_entry.chunk_pos);
345 
347  }
348 }
349 
350 // File header record
351 
354  chunk_count_ = chunks_.size();
355 
356  CONSOLE_BRIDGE_logDebug("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
357  (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
358 
359  // Write file header record
365  encryptor_->addFieldsToFileHeader(header);
366 
367  boost::shared_array<uint8_t> header_buffer;
368  uint32_t header_len;
369  ros::Header::write(header, header_buffer, header_len);
370  uint32_t data_len = 0;
371  if (header_len < FILE_HEADER_LENGTH)
372  data_len = FILE_HEADER_LENGTH - header_len;
373  write((char*) &header_len, 4);
374  write((char*) header_buffer.get(), header_len);
375  write((char*) &data_len, 4);
376 
377  // Pad the file header record out
378  if (data_len > 0) {
379  string padding;
380  padding.resize(data_len, ' ');
381  write(padding);
382  }
383 }
384 
387  uint32_t data_size;
388  if (!readHeader(header) || !readDataLength(data_size))
389  throw BagFormatException("Error reading FILE_HEADER record");
390 
391  M_string& fields = *header.getValues();
392 
393  if (!isOp(fields, OP_FILE_HEADER))
394  throw BagFormatException("Expected FILE_HEADER op not found");
395 
396  // Read index position
397  readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
398 
399  if (index_data_pos_ == 0)
400  throw BagUnindexedException();
401 
402  // Read topic and chunks count
403  if (version_ >= 200) {
406  std::string encryptor_plugin_name;
407  readField(fields, ENCRYPTOR_FIELD_NAME, 0, UINT_MAX, false, encryptor_plugin_name);
408  if (!encryptor_plugin_name.empty()) {
409  setEncryptorPlugin(encryptor_plugin_name);
410  encryptor_->readFieldsFromFileHeader(fields);
411  }
412  }
413 
414  CONSOLE_BRIDGE_logDebug("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
415  (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
416 
417  // Skip the data section (just padding)
418  seek(data_size, std::ios::cur);
419 }
420 
421 uint32_t Bag::getChunkOffset() const {
424  else
425  return file_.getCompressedBytesIn();
426 }
427 
429  // Initialize chunk info
432  curr_chunk_info_.end_time = time;
433 
434  // Write the chunk header, with a place-holder for the data sizes (we'll fill in when the chunk is finished)
436 
437  // Turn on compressed writing
439 
440  // Record where the data section of this chunk started
442 
443  chunk_open_ = true;
444 }
445 
447  // Add this chunk to the index
448  chunks_.push_back(curr_chunk_info_);
449 
450  // Get the uncompressed and compressed sizes
451  uint32_t uncompressed_size = getChunkOffset();
453  uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_;
454 
455  // When encryption is on, compressed_size represents encrypted chunk size;
456  // When decrypting, the actual compressed size can be deduced from the decrypted chunk
457  compressed_size = encryptor_->encryptChunk(compressed_size, curr_chunk_data_pos_, file_);
458 
459  // Rewrite the chunk header with the size of the chunk (remembering current offset)
460  uint64_t end_of_chunk_pos = file_.getOffset();
461 
463  writeChunkHeader(compression_, compressed_size, uncompressed_size);
464 
465  // Write out the indexes and clear them
466  seek(end_of_chunk_pos);
469 
470  // Clear the connection counts
472 
473  // Flag that we're starting a new chunk
474  chunk_open_ = false;
475 }
476 
477 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
478  ChunkHeader chunk_header;
479  switch (compression) {
480  case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
481  case compression::BZ2: chunk_header.compression = COMPRESSION_BZ2; break;
482  case compression::LZ4: chunk_header.compression = COMPRESSION_LZ4;
483  //case compression::ZLIB: chunk_header.compression = COMPRESSION_ZLIB; break;
484  }
485  chunk_header.compressed_size = compressed_size;
486  chunk_header.uncompressed_size = uncompressed_size;
487 
488  CONSOLE_BRIDGE_logDebug("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
489  (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
490 
493  header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
494  header[SIZE_FIELD_NAME] = toHeaderString(&chunk_header.uncompressed_size);
495  writeHeader(header);
496 
497  writeDataLength(chunk_header.compressed_size);
498 }
499 
500 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
502  if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
503  throw BagFormatException("Error reading CHUNK record");
504 
505  M_string& fields = *header.getValues();
506 
507  if (!isOp(fields, OP_CHUNK))
508  throw BagFormatException("Expected CHUNK op not found");
509 
510  readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
511  readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size);
512 
513  CONSOLE_BRIDGE_logDebug("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
514 }
515 
516 // Index records
517 
519  for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
520  uint32_t connection_id = i->first;
521  multiset<IndexEntry> const& index = i->second;
522 
523  // Write the index record header
524  uint32_t index_size = index.size();
527  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
529  header[COUNT_FIELD_NAME] = toHeaderString(&index_size);
530  writeHeader(header);
531 
532  writeDataLength(index_size * 12);
533 
534  CONSOLE_BRIDGE_logDebug("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
535 
536  // Write the index record data (pairs of timestamp and position in file)
537  for (IndexEntry const& e : index) {
538  write((char*) &e.time.sec, 4);
539  write((char*) &e.time.nsec, 4);
540  write((char*) &e.offset, 4);
541 
542  CONSOLE_BRIDGE_logDebug(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
543  }
544  }
545 }
546 
549  uint32_t data_size;
550  if (!readHeader(header) || !readDataLength(data_size))
551  throw BagFormatException("Error reading INDEX_DATA header");
552  M_string& fields = *header.getValues();
553 
554  if (!isOp(fields, OP_INDEX_DATA))
555  throw BagFormatException("Expected INDEX_DATA record");
556 
557  uint32_t index_version;
558  string topic;
559  uint32_t count = 0;
560  readField(fields, VER_FIELD_NAME, true, &index_version);
561  readField(fields, TOPIC_FIELD_NAME, true, topic);
562  readField(fields, COUNT_FIELD_NAME, true, &count);
563 
564  CONSOLE_BRIDGE_logDebug("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
565 
566  if (index_version != 0)
567  throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
568 
569  uint32_t connection_id;
570  map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
571  if (topic_conn_id_iter == topic_connection_ids_.end()) {
572  connection_id = connections_.size();
573 
574  CONSOLE_BRIDGE_logDebug("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
575  ConnectionInfo* connection_info = new ConnectionInfo();
576  connection_info->id = connection_id;
577  connection_info->topic = topic;
578  connections_[connection_id] = connection_info;
579 
580  topic_connection_ids_[topic] = connection_id;
581  }
582  else
583  connection_id = topic_conn_id_iter->second;
584 
585  multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
586 
587  for (uint32_t i = 0; i < count; i++) {
588  IndexEntry index_entry;
589  uint32_t sec;
590  uint32_t nsec;
591  read((char*) &sec, 4);
592  read((char*) &nsec, 4);
593  read((char*) &index_entry.chunk_pos, 8); //<! store position of the message in the chunk_pos field as it's 64 bits
594  index_entry.time = Time(sec, nsec);
595  index_entry.offset = 0;
596 
597  CONSOLE_BRIDGE_logDebug(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
598 
599  if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
600  {
601  CONSOLE_BRIDGE_logError("Index entry for topic %s contains invalid time.", topic.c_str());
602  } else
603  {
604  connection_index.insert(connection_index.end(), index_entry);
605  }
606  }
607 }
608 
611  uint32_t data_size;
612  if (!readHeader(header) || !readDataLength(data_size))
613  throw BagFormatException("Error reading INDEX_DATA header");
614  M_string& fields = *header.getValues();
615 
616  if (!isOp(fields, OP_INDEX_DATA))
617  throw BagFormatException("Expected INDEX_DATA record");
618 
619  uint32_t index_version;
620  uint32_t connection_id;
621  uint32_t count = 0;
622  readField(fields, VER_FIELD_NAME, true, &index_version);
623  readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
624  readField(fields, COUNT_FIELD_NAME, true, &count);
625 
626  CONSOLE_BRIDGE_logDebug("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
627 
628  if (index_version != 1)
629  throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
630 
631  uint64_t chunk_pos = curr_chunk_info_.pos;
632 
633  multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
634 
635  for (uint32_t i = 0; i < count; i++) {
636  IndexEntry index_entry;
637  index_entry.chunk_pos = chunk_pos;
638  uint32_t sec;
639  uint32_t nsec;
640  read((char*) &sec, 4);
641  read((char*) &nsec, 4);
642  read((char*) &index_entry.offset, 4);
643  index_entry.time = Time(sec, nsec);
644 
645  CONSOLE_BRIDGE_logDebug(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
646 
647  if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
648  {
649  CONSOLE_BRIDGE_logError("Index entry for topic %s contains invalid time. This message will not be loaded.", connections_[connection_id]->topic.c_str());
650  } else
651  {
652  connection_index.insert(connection_index.end(), index_entry);
653  }
654  }
655 }
656 
657 // Connection records
658 
660  for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
661  ConnectionInfo const* connection_info = i->second;
662  writeConnectionRecord(connection_info, true);
663  }
664 }
665 
666 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info, const bool encrypt) {
667  CONSOLE_BRIDGE_logDebug("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
668  (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
669 
672  header[TOPIC_FIELD_NAME] = connection_info->topic;
673  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
674 
675  if (encrypt)
676  encryptor_->writeEncryptedHeader(boost::bind(&Bag::writeHeader, this, _1), header, file_);
677  else
678  writeHeader(header);
679 
680  if (encrypt)
681  encryptor_->writeEncryptedHeader(boost::bind(&Bag::writeHeader, this, _1), *connection_info->header, file_);
682  else
683  writeHeader(*connection_info->header);
684 }
685 
686 void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) {
689  header[TOPIC_FIELD_NAME] = connection_info->topic;
690  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
691  appendHeaderToBuffer(buf, header);
692 
693  appendHeaderToBuffer(buf, *connection_info->header);
694 }
695 
698  if (!encryptor_->readEncryptedHeader(boost::bind(&Bag::readHeader, this, _1), header, header_buffer_, file_))
699  throw BagFormatException("Error reading CONNECTION header");
700  M_string& fields = *header.getValues();
701 
702  if (!isOp(fields, OP_CONNECTION))
703  throw BagFormatException("Expected CONNECTION op not found");
704 
705  uint32_t id;
706  readField(fields, CONNECTION_FIELD_NAME, true, &id);
707  string topic;
708  readField(fields, TOPIC_FIELD_NAME, true, topic);
709 
710  ros::Header connection_header;
711  if (!encryptor_->readEncryptedHeader(boost::bind(&Bag::readHeader, this, _1), connection_header, header_buffer_, file_))
712  throw BagFormatException("Error reading connection header");
713 
714  // If this is a new connection, update connections
715  map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
716  if (key == connections_.end()) {
717  ConnectionInfo* connection_info = new ConnectionInfo();
718  connection_info->id = id;
719  connection_info->topic = topic;
720  connection_info->header = boost::make_shared<M_string>();
721  for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
722  (*connection_info->header)[i->first] = i->second;
723  connection_info->msg_def = (*connection_info->header)["message_definition"];
724  connection_info->datatype = (*connection_info->header)["type"];
725  connection_info->md5sum = (*connection_info->header)["md5sum"];
726  connections_[id] = connection_info;
727 
728  CONSOLE_BRIDGE_logDebug("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
729  }
730 }
731 
734  uint32_t data_size;
735  if (!readHeader(header) || !readDataLength(data_size))
736  throw BagFormatException("Error reading message definition header");
737  M_string& fields = *header.getValues();
738 
739  if (!isOp(fields, OP_MSG_DEF))
740  throw BagFormatException("Expected MSG_DEF op not found");
741 
742  string topic, md5sum, datatype, message_definition;
743  readField(fields, TOPIC_FIELD_NAME, true, topic);
744  readField(fields, MD5_FIELD_NAME, 32, 32, true, md5sum);
745  readField(fields, TYPE_FIELD_NAME, true, datatype);
746  readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition);
747 
748  ConnectionInfo* connection_info;
749 
750  map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
751  if (topic_conn_id_iter == topic_connection_ids_.end()) {
752  uint32_t id = connections_.size();
753 
754  CONSOLE_BRIDGE_logDebug("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
755  connection_info = new ConnectionInfo();
756  connection_info->id = id;
757  connection_info->topic = topic;
758 
759  connections_[id] = connection_info;
760  topic_connection_ids_[topic] = id;
761  }
762  else
763  connection_info = connections_[topic_conn_id_iter->second];
764 
765  connection_info->msg_def = message_definition;
766  connection_info->datatype = datatype;
767  connection_info->md5sum = md5sum;
768  connection_info->header = boost::make_shared<ros::M_string>();
769  (*connection_info->header)["type"] = connection_info->datatype;
770  (*connection_info->header)["md5sum"] = connection_info->md5sum;
771  (*connection_info->header)["message_definition"] = connection_info->msg_def;
772 
773  CONSOLE_BRIDGE_logDebug("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
774 }
775 
776 void Bag::decompressChunk(uint64_t chunk_pos) const {
777  if (curr_chunk_info_.pos == chunk_pos) {
779  return;
780  }
781 
783 
784  if (decompressed_chunk_ == chunk_pos)
785  return;
786 
787  // Seek to the start of the chunk
788  seek(chunk_pos);
789 
790  // Read the chunk header
791  ChunkHeader chunk_header;
792  readChunkHeader(chunk_header);
793 
794  // Read and decompress the chunk. These assume we are at the right place in the stream already
795  if (chunk_header.compression == COMPRESSION_NONE)
796  decompressRawChunk(chunk_header);
797  else if (chunk_header.compression == COMPRESSION_BZ2)
798  decompressBz2Chunk(chunk_header);
799  else if (chunk_header.compression == COMPRESSION_LZ4)
800  decompressLz4Chunk(chunk_header);
801  else
802  throw BagFormatException("Unknown compression: " + chunk_header.compression);
803 
804  decompressed_chunk_ = chunk_pos;
805 }
806 
807 void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const {
808  CONSOLE_BRIDGE_logDebug("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
809 
810  seek(offset);
811 
812  uint32_t data_size;
813  uint8_t op;
814  do {
815  if (!readHeader(header) || !readDataLength(data_size))
816  throw BagFormatException("Error reading header");
817 
818  readField(*header.getValues(), OP_FIELD_NAME, true, &op);
819  }
820  while (op == OP_MSG_DEF);
821 
822  if (op != OP_MSG_DATA)
823  throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str());
824 
825  record_buffer_.setSize(data_size);
826  file_.read((char*) record_buffer_.getData(), data_size);
827 }
828 
829 // Reading this into a buffer isn't completely necessary, but we do it anyways for now
830 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
831  assert(chunk_header.compression == COMPRESSION_NONE);
832 
833  CONSOLE_BRIDGE_logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
834 
835  encryptor_->decryptChunk(chunk_header, decompress_buffer_, file_);
836 
837  // todo check read was successful
838 }
839 
840 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
841  assert(chunk_header.compression == COMPRESSION_BZ2);
842 
843  CompressionType compression = compression::BZ2;
844 
845  CONSOLE_BRIDGE_logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
846 
847  encryptor_->decryptChunk(chunk_header, chunk_buffer_, file_);
848 
851 
852  // todo check read was successful
853 }
854 
855 void Bag::decompressLz4Chunk(ChunkHeader const& chunk_header) const {
856  assert(chunk_header.compression == COMPRESSION_LZ4);
857 
858  CompressionType compression = compression::LZ4;
859 
860  CONSOLE_BRIDGE_logDebug("lz4 compressed_size: %d uncompressed_size: %d",
861  chunk_header.compressed_size, chunk_header.uncompressed_size);
862 
863  encryptor_->decryptChunk(chunk_header, chunk_buffer_, file_);
864 
867 
868  // todo check read was successful
869 }
870 
873  uint32_t data_size;
874  uint32_t bytes_read;
875  switch (version_)
876  {
877  case 200:
878  decompressChunk(index_entry.chunk_pos);
879  readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
880  return header;
881  case 102:
882  readMessageDataRecord102(index_entry.chunk_pos, header);
883  return header;
884  default:
885  throw BagFormatException((format("Unhandled version: %1%") % version_).str());
886  }
887 }
888 
889 // NOTE: this loads the header, which is unnecessary
890 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
892  uint32_t data_size;
893  uint32_t bytes_read;
894  switch (version_)
895  {
896  case 200:
897  decompressChunk(index_entry.chunk_pos);
898  readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
899  return data_size;
900  case 102:
901  readMessageDataRecord102(index_entry.chunk_pos, header);
902  return record_buffer_.getSize();
903  default:
904  throw BagFormatException((format("Unhandled version: %1%") % version_).str());
905  }
906 }
907 
909  for (ChunkInfo const& chunk_info : chunks_) {
910  // Write the chunk info header
912  uint32_t chunk_connection_count = chunk_info.connection_counts.size();
915  header[CHUNK_POS_FIELD_NAME] = toHeaderString(&chunk_info.pos);
916  header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
917  header[END_TIME_FIELD_NAME] = toHeaderString(&chunk_info.end_time);
918  header[COUNT_FIELD_NAME] = toHeaderString(&chunk_connection_count);
919 
920  CONSOLE_BRIDGE_logDebug("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
921  (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
922  chunk_info.start_time.sec, chunk_info.start_time.nsec,
923  chunk_info.end_time.sec, chunk_info.end_time.nsec);
924 
925  writeHeader(header);
926 
927  writeDataLength(8 * chunk_connection_count);
928 
929  // Write the topic names and counts
930  for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
931  uint32_t connection_id = i->first;
932  uint32_t count = i->second;
933 
934  write((char*) &connection_id, 4);
935  write((char*) &count, 4);
936 
937  CONSOLE_BRIDGE_logDebug(" - %d: %d", connection_id, count);
938  }
939  }
940 }
941 
943  // Read a CHUNK_INFO header
945  uint32_t data_size;
946  if (!readHeader(header) || !readDataLength(data_size))
947  throw BagFormatException("Error reading CHUNK_INFO record header");
948  M_string& fields = *header.getValues();
949  if (!isOp(fields, OP_CHUNK_INFO))
950  throw BagFormatException("Expected CHUNK_INFO op not found");
951 
952  // Check that the chunk info version is current
953  uint32_t chunk_info_version;
954  readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
955  if (chunk_info_version != CHUNK_INFO_VERSION)
956  throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str());
957 
958  // Read the chunk position, timestamp, and topic count fields
959  ChunkInfo chunk_info;
960  readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos);
961  readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time);
962  readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time);
963  uint32_t chunk_connection_count = 0;
964  readField(fields, COUNT_FIELD_NAME, true, &chunk_connection_count);
965 
966  CONSOLE_BRIDGE_logDebug("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
967  (unsigned long long) chunk_info.pos, chunk_connection_count,
968  chunk_info.start_time.sec, chunk_info.start_time.nsec,
969  chunk_info.end_time.sec, chunk_info.end_time.nsec);
970 
971  // Read the topic count entries
972  for (uint32_t i = 0; i < chunk_connection_count; i ++) {
973  uint32_t connection_id, connection_count;
974  read((char*) &connection_id, 4);
975  read((char*) &connection_count, 4);
976 
977  CONSOLE_BRIDGE_logDebug(" %d: %d messages", connection_id, connection_count);
978 
979  chunk_info.connection_counts[connection_id] = connection_count;
980  }
981 
982  chunks_.push_back(chunk_info);
983 }
984 
985 // Record I/O
986 
987 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
988  uint8_t op = 0xFF; // nonexistent op
989  readField(fields, OP_FIELD_NAME, true, &op);
990  return op == reqOp;
991 }
992 
993 void Bag::writeHeader(M_string const& fields) {
994  boost::shared_array<uint8_t> header_buffer;
995  uint32_t header_len;
996  ros::Header::write(fields, header_buffer, header_len);
997  write((char*) &header_len, 4);
998  write((char*) header_buffer.get(), header_len);
999 }
1000 
1001 void Bag::writeDataLength(uint32_t data_len) {
1002  write((char*) &data_len, 4);
1003 }
1004 
1005 void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) {
1006  boost::shared_array<uint8_t> header_buffer;
1007  uint32_t header_len;
1008  ros::Header::write(fields, header_buffer, header_len);
1009 
1010  uint32_t offset = buf.getSize();
1011 
1012  buf.setSize(buf.getSize() + 4 + header_len);
1013 
1014  memcpy(buf.getData() + offset, &header_len, 4);
1015  offset += 4;
1016  memcpy(buf.getData() + offset, header_buffer.get(), header_len);
1017 }
1018 
1019 void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) {
1020  uint32_t offset = buf.getSize();
1021 
1022  buf.setSize(buf.getSize() + 4);
1023 
1024  memcpy(buf.getData() + offset, &data_len, 4);
1025 }
1026 
1028 void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const {
1029  assert(buffer.getSize() > 8);
1030 
1031  uint8_t* start = (uint8_t*) buffer.getData() + offset;
1032 
1033  uint8_t* ptr = start;
1034 
1035  // Read the header length
1036  uint32_t header_len;
1037  memcpy(&header_len, ptr, 4);
1038  ptr += 4;
1039 
1040  // Parse the header
1041  string error_msg;
1042  bool parsed = header.parse(ptr, header_len, error_msg);
1043  if (!parsed)
1044  throw BagFormatException("Error parsing header");
1045  ptr += header_len;
1046 
1047  // Read the data size
1048  memcpy(&data_size, ptr, 4);
1049  ptr += 4;
1050 
1051  bytes_read = ptr - start;
1052 }
1053 
1054 void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) const {
1055  (void)buffer;
1056  total_bytes_read = 0;
1057  uint8_t op = 0xFF;
1058  do {
1059  CONSOLE_BRIDGE_logDebug("reading header from buffer: offset=%d", offset);
1060  uint32_t bytes_read;
1061  readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
1062 
1063  offset += bytes_read;
1064  total_bytes_read += bytes_read;
1065 
1066  readField(*header.getValues(), OP_FIELD_NAME, true, &op);
1067  }
1068  while (op == OP_MSG_DEF || op == OP_CONNECTION);
1069 
1070  if (op != OP_MSG_DATA)
1071  throw BagFormatException("Expected MSG_DATA op not found");
1072 }
1073 
1074 bool Bag::readHeader(ros::Header& header) const {
1075  // Read the header length
1076  uint32_t header_len;
1077  read((char*) &header_len, 4);
1078 
1079  // Read the header
1080  header_buffer_.setSize(header_len);
1081  read((char*) header_buffer_.getData(), header_len);
1082 
1083  // Parse the header
1084  string error_msg;
1085  bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
1086  if (!parsed)
1087  return false;
1088 
1089  return true;
1090 }
1091 
1092 bool Bag::readDataLength(uint32_t& data_size) const {
1093  read((char*) &data_size, 4);
1094  return true;
1095 }
1096 
1097 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
1098  M_string::const_iterator fitr = fields.find(field);
1099  if (fitr == fields.end()) {
1100  if (required)
1101  throw BagFormatException("Required '" + field + "' field missing");
1102  }
1103  else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
1104  throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
1105 
1106  return fitr;
1107 }
1108 
1109 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
1110  return readField(fields, field_name, 1, UINT_MAX, required, data);
1111 }
1112 
1113 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
1114  M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
1115  if (fitr == fields.end())
1116  return false;
1117 
1118  data = fitr->second;
1119  return true;
1120 }
1121 
1122 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
1123  uint64_t packed_time;
1124  if (!readField(fields, field_name, required, &packed_time))
1125  return false;
1126 
1127  uint64_t bitmask = (1LL << 33) - 1;
1128  data.sec = (uint32_t) (packed_time & bitmask);
1129  data.nsec = (uint32_t) (packed_time >> 32);
1130 
1131  return true;
1132 }
1133 
1134 std::string Bag::toHeaderString(Time const* field) const {
1135  uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
1136  return toHeaderString(&packed_time);
1137 }
1138 
1139 
1140 // Low-level I/O
1141 
1142 void Bag::write(string const& s) { write(s.c_str(), s.length()); }
1143 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n); }
1144 
1145 void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); }
1146 void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); }
1147 
1148 void Bag::swap(Bag& other) {
1149  using std::swap;
1150  swap(mode_, other.mode_);
1151  swap(file_, other.file_);
1152  swap(version_, other.version_);
1153  swap(compression_, other.compression_);
1156  swap(file_size_, other.file_size_);
1160  swap(chunk_count_, other.chunk_count_);
1161  swap(chunk_open_, other.chunk_open_);
1166  swap(connections_, other.connections_);
1167  swap(chunks_, other.chunks_);
1177  swap(encryptor_, other.encryptor_);
1178 }
1179 
1180 bool Bag::isOpen() const { return file_.isOpen(); }
1181 
1182 } // namespace rosbag
1183 
static const std::string COMPRESSION_BZ2
Definition: constants.h:97
void openRead(std::string const &filename)
open file for reading
static const std::string END_TIME_FIELD_NAME
Definition: constants.h:62
void startWritingChunk(ros::Time time)
Definition: bag.cpp:428
static const std::string CHUNK_COUNT_FIELD_NAME
Definition: constants.h:56
BagMode mode_
Definition: bag.h:315
void read(void *ptr, size_t size)
read size bytes from the file into ptr
static const std::string COMPRESSION_FIELD_NAME
Definition: constants.h:58
ros::M_string::const_iterator checkField(ros::M_string const &fields, std::string const &field, unsigned int min_len, unsigned int max_len, bool required) const
Definition: bag.cpp:1097
std::map< std::string, uint32_t > topic_connection_ids_
Definition: bag.h:333
uint32_t getMajorVersion() const
Get the major-version of the open bag file.
Definition: bag.cpp:256
BagMode getMode() const
Get the mode the bag is in.
Definition: bag.cpp:189
Bag & operator=(Bag &&other)
Definition: bag.cpp:74
void decompressLz4Chunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:855
uint64_t pos
absolute byte offset of chunk record in bag file
Definition: structures.h:64
void openRead(std::string const &filename)
Definition: bag.cpp:122
static const std::string VER_FIELD_NAME
Definition: constants.h:52
CompressionType getCompression() const
Get the compression method to use for writing chunks.
Definition: bag.cpp:201
ROSCPP_DECL void start()
static const std::string COMPRESSION_LZ4
Definition: constants.h:98
bool isOpen() const
return true if file is open for reading or writing
void open(std::string const &filename, uint32_t mode=bagmode::Read)
Open a bag file.
Definition: bag.cpp:103
void writeConnectionRecord(ConnectionInfo const *connection_info, const bool encrypt)
Definition: bag.cpp:666
std::map< uint32_t, ConnectionInfo * > connections_
Definition: bag.h:335
std::string compression
chunk compression type, e.g. "none" or "bz2" (see constants.h)
Definition: structures.h:71
bool readField(ros::M_string const &fields, std::string const &field_name, bool required, T *data) const
Definition: bag.h:394
int version_
Definition: bag.h:317
std::string toHeaderString(T const *field) const
Definition: bag.h:389
void swap(Bag &)
Definition: bag.cpp:1148
static const std::string CONNECTION_FIELD_NAME
Definition: constants.h:57
const Time TIME_MIN(0, 1)
void startReadingVersion102()
Definition: bag.cpp:316
void write(std::string const &s)
uint8_t * getData()
Definition: buffer.cpp:52
boost::shared_ptr< T > createInstance(const std::string &lookup_name)
uint64_t file_size_
Definition: bag.h:322
std::map< ros::M_string, uint32_t > header_connection_ids_
Definition: bag.h:334
static const std::string INDEX_POS_FIELD_NAME
Definition: constants.h:54
void setChunkThreshold(uint32_t chunk_threshold)
Set the threshold for creating new chunks.
Definition: bag.cpp:194
ros::Time time
timestamp of the message
Definition: structures.h:78
void readTopicIndexRecord102()
Definition: bag.cpp:547
XmlRpcServer s
std::map< uint32_t, uint32_t > connection_counts
number of messages in each connection stored in the chunk
Definition: structures.h:66
Buffer header_buffer_
reusable buffer in which to assemble the record header before writing to file
Definition: bag.h:342
static const unsigned char OP_CONNECTION
Definition: constants.h:81
void openWrite(std::string const &filename)
Definition: bag.cpp:135
uint32_t getMinorVersion() const
Get the minor-version of the open bag file.
Definition: bag.cpp:257
Base class for rosbag exceptions.
Definition: exceptions.h:43
bool truncate(uint64_t length)
void readMessageDataHeaderFromBuffer(Buffer &buffer, uint32_t offset, ros::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
Definition: bag.cpp:1054
void seek(uint64_t offset, int origin=std::ios_base::beg)
seek to given offset from origin
void writeConnectionRecords()
Definition: bag.cpp:659
static const std::string DEF_FIELD_NAME
Definition: constants.h:69
static const std::string ENCRYPTOR_FIELD_NAME
Definition: constants.h:64
void openAppend(std::string const &filename)
Definition: bag.cpp:141
void close()
close the file
void seek(uint64_t pos, int origin=std::ios_base::beg) const
Definition: bag.cpp:1146
void swap(Bag &a, Bag &b)
Definition: bag.h:674
void appendDataLengthToBuffer(Buffer &buf, uint32_t data_len)
Definition: bag.cpp:1019
std::string getFileName() const
return path of currently open file
Exception thrown on problems reading the bag format.
Definition: exceptions.h:57
BagMode
The possible modes to open a bag in.
Definition: bag.h:86
uint64_t getOffset() const
return current offset from the beginning of the file
std::string md5sum
Definition: structures.h:54
void setWriteMode(CompressionType type)
static const unsigned char OP_MSG_DATA
Definition: constants.h:76
void close()
Close the bag file.
Definition: bag.cpp:163
uint32_t getCompressedBytesIn() const
return the number of bytes written to current compressed stream
static const unsigned char OP_CHUNK
Definition: constants.h:79
void readChunkHeader(ChunkHeader &chunk_header) const
Definition: bag.cpp:500
static const std::string VERSION
Definition: constants.h:44
Buffer decompress_buffer_
reusable buffer to decompress chunks into
Definition: bag.h:346
uint64_t getSize() const
Get the current size of the bag file (a lower bound)
Definition: bag.cpp:190
void appendConnectionRecordToBuffer(Buffer &buf, ConnectionInfo const *connection_info)
Definition: bag.cpp:686
uint32_t compressed_size
compressed size of the chunk in bytes
Definition: structures.h:72
bool isOpen() const
Definition: bag.cpp:1180
std::map< std::string, std::string > M_string
void readMessageDefinitionRecord102()
Definition: bag.cpp:732
void readHeaderFromBuffer(Buffer &buffer, uint32_t offset, ros::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
Definition: bag.cpp:1028
void writeFileHeaderRecord()
Definition: bag.cpp:352
void readVersion()
Definition: bag.cpp:237
static const std::string CHUNK_POS_FIELD_NAME
Definition: constants.h:63
uint64_t decompressed_chunk_
position of decompressed chunk
Definition: bag.h:352
const char * datatype()
uint64_t index_data_pos_
Definition: bag.h:324
~Bag()
Definition: bag.cpp:81
void writeHeader(ros::M_string const &fields)
Definition: bag.cpp:993
static const std::string START_TIME_FIELD_NAME
Definition: constants.h:61
uint32_t getChunkOffset() const
Definition: bag.cpp:421
Exception thrown on problems reading the bag index.
Definition: exceptions.h:64
static const uint32_t CHUNK_INFO_VERSION
Definition: constants.h:93
boost::shared_ptr< rosbag::EncryptorBase > encryptor_
Definition: bag.h:357
Buffer * current_buffer_
Definition: bag.h:350
Exception thrown when on IO problems.
Definition: exceptions.h:50
bool parse(const boost::shared_array< uint8_t > &buffer, uint32_t size, std::string &error_msg)
void appendHeaderToBuffer(Buffer &buf, ros::M_string const &fields)
Definition: bag.cpp:1005
void setCompression(CompressionType compression)
Set the compression method to use for writing chunks.
Definition: bag.cpp:203
static void write(const M_string &key_vals, boost::shared_array< uint8_t > &buffer, uint32_t &size)
std::string msg_def
Definition: structures.h:55
void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size)
Definition: bag.cpp:477
static const std::string OP_FIELD_NAME
Definition: constants.h:50
std::string getline()
uint64_t file_header_pos_
Definition: bag.h:323
uint32_t bag_revision_
Definition: bag.h:320
Buffer record_buffer_
reusable buffer in which to assemble the record data before writing to file
Definition: bag.h:343
void writeVersion()
Definition: bag.cpp:227
void stopWritingChunk()
Definition: bag.cpp:446
static const std::string TYPE_FIELD_NAME
Definition: constants.h:68
uint32_t readMessageDataSize(IndexEntry const &index_entry) const
Definition: bag.cpp:890
ROSTIME_DECL const Time TIME_MAX
Buffer chunk_buffer_
reusable buffer to read chunk into
Definition: bag.h:345
ChunkInfo curr_chunk_info_
Definition: bag.h:330
static const std::string COUNT_FIELD_NAME
Definition: constants.h:53
void decompressChunk(uint64_t chunk_pos) const
Definition: bag.cpp:776
std::map< uint32_t, std::multiset< IndexEntry > > connection_indexes_
Definition: bag.h:339
uint64_t chunk_pos
absolute byte offset of the chunk record containing the message
Definition: structures.h:79
static const unsigned char OP_FILE_HEADER
Definition: constants.h:77
static const unsigned char OP_INDEX_DATA
Definition: constants.h:78
uint32_t connection_count_
Definition: bag.h:325
void readConnectionIndexRecord200()
Definition: bag.cpp:609
void readChunkInfoRecord()
Definition: bag.cpp:942
void decompressRawChunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:830
void openWrite(std::string const &filename)
open file for writing
void stopWriting()
Definition: bag.cpp:267
M_stringPtr getValues()
uint32_t offset
relative byte offset of the message record (either definition or data) in the chunk ...
Definition: structures.h:80
void init()
Definition: bag.cpp:85
uint32_t chunk_threshold_
Definition: bag.h:319
std::vector< ChunkInfo > chunks_
Definition: bag.h:337
std::string datatype
Definition: structures.h:53
boost::shared_ptr< ros::M_string > header
Definition: structures.h:57
void startReadingVersion200()
Definition: bag.cpp:281
uint32_t chunk_count_
Definition: bag.h:326
bagmode::BagMode BagMode
Definition: bag.h:93
void read(char *b, std::streamsize n) const
Definition: bag.cpp:1145
void closeWrite()
Definition: bag.cpp:184
bool readHeader(ros::Header &header) const
Definition: bag.cpp:1074
void writeDataLength(uint32_t data_len)
Definition: bag.cpp:1001
uint32_t getChunkThreshold() const
Get the threshold for creating new chunks.
Definition: bag.cpp:192
bool isOp(ros::M_string &fields, uint8_t reqOp) const
Definition: bag.cpp:987
uint32_t getSize() const
Definition: buffer.cpp:54
CompressionType compression_
Definition: bag.h:318
void decompress(CompressionType compression, uint8_t *dest, unsigned int dest_len, uint8_t *source, unsigned int source_len)
std::map< uint32_t, std::multiset< IndexEntry > > curr_chunk_connection_indexes_
Definition: bag.h:340
uint32_t uncompressed_size
uncompressed size of the chunk in bytes
Definition: structures.h:73
static const std::string CONNECTION_COUNT_FIELD_NAME
Definition: constants.h:55
static const uint32_t FILE_HEADER_LENGTH
Definition: constants.h:87
uint64_t curr_chunk_data_pos_
Definition: bag.h:331
#define CONSOLE_BRIDGE_logError(fmt,...)
static const std::string MD5_FIELD_NAME
Definition: constants.h:67
Buffer outgoing_chunk_buffer_
reusable buffer to read chunk into
Definition: bag.h:348
std::string getFileName() const
Get the filename of the bag.
Definition: bag.cpp:188
static const unsigned char OP_CHUNK_INFO
Definition: constants.h:80
bool readDataLength(uint32_t &data_size) const
Definition: bag.cpp:1092
pluginlib::ClassLoader< rosbag::EncryptorBase > encryptor_loader_
Definition: bag.h:355
const char * md5sum()
static const std::string SIZE_FIELD_NAME
Definition: constants.h:59
void readConnectionRecord()
Definition: bag.cpp:696
const std::string header
static const std::string COMPRESSION_NONE
Definition: constants.h:96
ros::Time end_time
latest timestamp of a message in the chunk
Definition: structures.h:63
void writeIndexRecords()
Definition: bag.cpp:518
void startWriting()
Definition: bag.cpp:261
void readMessageDataRecord102(uint64_t offset, ros::Header &header) const
Definition: bag.cpp:807
void setEncryptorPlugin(const std::string &plugin_name, const std::string &plugin_param=std::string())
Set encryptor of the bag file.
Definition: bag.cpp:217
void writeChunkInfoRecords()
Definition: bag.cpp:908
ChunkedFile file_
Definition: bag.h:316
static const std::string TOPIC_FIELD_NAME
Definition: constants.h:51
void openReadWrite(std::string const &filename)
open file for reading & writing
static const unsigned char OP_MSG_DEF
Definition: constants.h:84
void setSize(uint32_t size)
Definition: buffer.cpp:56
void write(std::string const &topic, ros::MessageEvent< T > const &event)
Write a message into the bag file.
Definition: bag.h:369
void readFileHeaderRecord()
Definition: bag.cpp:385
static const uint32_t INDEX_VERSION
Definition: constants.h:90
ros::Header readMessageDataHeader(IndexEntry const &index_entry)
Definition: bag.cpp:871
bool chunk_open_
Definition: bag.h:329
void decompressBz2Chunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:840
ros::Time start_time
earliest timestamp of a message in the chunk
Definition: structures.h:62


rosbag_storage
Author(s): Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:55