bag.cpp
Go to the documentation of this file.
1 // Copyright (c) 2009, Willow Garage, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 //
7 // * Redistributions of source code must retain the above copyright
8 // notice, this list of conditions and the following disclaimer.
9 // * Redistributions in binary form must reproduce the above copyright
10 // notice, this list of conditions and the following disclaimer in the
11 // documentation and/or other materials provided with the distribution.
12 // * Neither the name of Willow Garage, Inc. nor the names of its
13 // contributors may be used to endorse or promote products derived from
14 // this software without specific prior written permission.
15 //
16 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 // POSSIBILITY OF SUCH DAMAGE.
27 
28 #include "rosbag/bag.h"
30 #include "rosbag/query.h"
31 #include "rosbag/view.h"
32 
33 #if defined(_MSC_VER)
34  #include <stdint.h> // only on v2010 and later -> is this enough for msvc and linux?
35 #else
36  #include <inttypes.h>
37 #endif
38 #include <signal.h>
39 #include <assert.h>
40 #include <iomanip>
41 
42 #include <boost/foreach.hpp>
43 
44 #include "console_bridge/console.h"
45 // Remove this include when no longer supporting platforms with libconsole-bridge-dev < 0.3.0,
46 // in particular Debian Jessie: https://packages.debian.org/jessie/libconsole-bridge-dev
48 
49 #define foreach BOOST_FOREACH
50 
51 using std::map;
52 using std::priority_queue;
53 using std::string;
54 using std::vector;
55 using std::multiset;
56 using boost::format;
57 using boost::shared_ptr;
58 using ros::M_string;
59 using ros::Time;
60 
61 namespace rosbag {
62 
64  mode_(bagmode::Write),
65  version_(0),
66  compression_(compression::Uncompressed),
67  chunk_threshold_(768 * 1024), // 768KB chunks
68  bag_revision_(0),
69  file_size_(0),
70  file_header_pos_(0),
71  index_data_pos_(0),
72  connection_count_(0),
73  chunk_count_(0),
74  chunk_open_(false),
75  curr_chunk_data_pos_(0),
76  current_buffer_(0),
77  decompressed_chunk_(0)
78 {
79 }
80 
81 Bag::Bag(string const& filename, uint32_t mode) :
83  chunk_threshold_(768 * 1024), // 768KB chunks
84  bag_revision_(0),
85  file_size_(0),
87  index_data_pos_(0),
89  chunk_count_(0),
90  chunk_open_(false),
92  current_buffer_(0),
94 {
95  open(filename, mode);
96 }
97 
99  close();
100 }
101 
102 void Bag::open(string const& filename, uint32_t mode) {
103  mode_ = (BagMode) mode;
104 
105  if (mode_ & bagmode::Append)
106  openAppend(filename);
107  else if (mode_ & bagmode::Write)
108  openWrite(filename);
109  else if (mode_ & bagmode::Read)
110  openRead(filename);
111  else
112  throw BagException((format("Unknown mode: %1%") % (int) mode).str());
113 
114  // Determine file size
115  uint64_t offset = file_.getOffset();
116  seek(0, std::ios::end);
118  seek(offset);
119 }
120 
121 void Bag::openRead(string const& filename) {
122  file_.openRead(filename);
123 
124  readVersion();
125 
126  switch (version_) {
127  case 102: startReadingVersion102(); break;
128  case 200: startReadingVersion200(); break;
129  default:
130  throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str());
131  }
132 }
133 
134 void Bag::openWrite(string const& filename) {
135  file_.openWrite(filename);
136 
137  startWriting();
138 }
139 
140 void Bag::openAppend(string const& filename) {
141  file_.openReadWrite(filename);
142 
143  readVersion();
144 
145  if (version_ != 200)
146  throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str());
147 
149 
150  // Truncate the file to chop off the index
152  index_data_pos_ = 0;
153 
154  // Rewrite the file header, clearing the index position (so we know if the index is invalid)
157 
158  // Seek to the end of the file
159  seek(0, std::ios::end);
160 }
161 
162 void Bag::close() {
163  if (!isOpen())
164  return;
165 
167  closeWrite();
168 
169  file_.close();
170 
171  topic_connection_ids_.clear();
172  header_connection_ids_.clear();
173  for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
174  delete i->second;
175  connections_.clear();
176  chunks_.clear();
177  connection_indexes_.clear();
179 }
180 
182  stopWriting();
183 }
184 
185 string Bag::getFileName() const { return file_.getFileName(); }
186 BagMode Bag::getMode() const { return mode_; }
187 uint64_t Bag::getSize() const { return file_size_; }
188 
189 uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; }
190 
191 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
192  if (isOpen() && chunk_open_)
194 
195  chunk_threshold_ = chunk_threshold;
196 }
197 
199 
201  if (isOpen() && chunk_open_)
203 
204  if (!(compression == compression::Uncompressed ||
205  compression == compression::BZ2 ||
206  compression == compression::LZ4)) {
207  throw BagException(
208  (format("Unknown compression type: %i") % compression).str());
209  }
210 
211  compression_ = compression;
212 }
213 
214 // Version
215 
217  string version = string("#ROSBAG V") + VERSION + string("\n");
218 
219  CONSOLE_BRIDGE_logDebug("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
220 
221  version_ = 200;
222 
223  write(version);
224 }
225 
227  string version_line = file_.getline();
228 
230 
231  char logtypename[100];
232  int version_major, version_minor;
233 #if defined(_MSC_VER)
234  if (sscanf_s(version_line.c_str(), "#ROS%s V%d.%d", logtypename, sizeof(logtypename), &version_major, &version_minor) != 3)
235 #else
236  if (sscanf(version_line.c_str(), "#ROS%99s V%d.%d", logtypename, &version_major, &version_minor) != 3)
237 #endif
238  throw BagIOException("Error reading version line");
239 
240  version_ = version_major * 100 + version_minor;
241 
242  CONSOLE_BRIDGE_logDebug("Read VERSION: version=%d", version_);
243 }
244 
245 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
246 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
247 
248 //
249 
251  writeVersion();
254 }
255 
257  if (chunk_open_)
259 
260  seek(0, std::ios::end);
261 
265 
268 }
269 
271  // Read the file header record, which points to the end of the chunks
273 
274  // Seek to the end of the chunks
276 
277  // Read the connection records (one for each connection)
278  for (uint32_t i = 0; i < connection_count_; i++)
280 
281  // Read the chunk info records
282  for (uint32_t i = 0; i < chunk_count_; i++)
284 
285  // Read the connection indexes for each chunk
286  foreach(ChunkInfo const& chunk_info, chunks_) {
287  curr_chunk_info_ = chunk_info;
288 
290 
291  // Skip over the chunk data
292  ChunkHeader chunk_header;
293  readChunkHeader(chunk_header);
294  seek(chunk_header.compressed_size, std::ios::cur);
295 
296  // Read the index records after the chunk
297  for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
299  }
300 
301  // At this point we don't have a curr_chunk_info anymore so we reset it
303 }
304 
306  try
307  {
308  // Read the file header record, which points to the start of the topic indexes
310  }
311  catch (BagFormatException ex) {
312  throw BagUnindexedException();
313  }
314 
315  // Get the length of the file
316  seek(0, std::ios::end);
317  uint64_t filelength = file_.getOffset();
318 
319  // Seek to the beginning of the topic index records
321 
322  // Read the topic index records, which point to the offsets of each message in the file
323  while (file_.getOffset() < filelength)
325 
326  // Read the message definition records (which are the first entry in the topic indexes)
327  for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
328  multiset<IndexEntry> const& index = i->second;
329  IndexEntry const& first_entry = *index.begin();
330 
331  CONSOLE_BRIDGE_logDebug("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
332 
333  seek(first_entry.chunk_pos);
334 
336  }
337 }
338 
339 // File header record
340 
343  chunk_count_ = chunks_.size();
344 
345  CONSOLE_BRIDGE_logDebug("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
346  (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
347 
348  // Write file header record
354 
355  boost::shared_array<uint8_t> header_buffer;
356  uint32_t header_len;
357  ros::Header::write(header, header_buffer, header_len);
358  uint32_t data_len = 0;
359  if (header_len < FILE_HEADER_LENGTH)
360  data_len = FILE_HEADER_LENGTH - header_len;
361  write((char*) &header_len, 4);
362  write((char*) header_buffer.get(), header_len);
363  write((char*) &data_len, 4);
364 
365  // Pad the file header record out
366  if (data_len > 0) {
367  string padding;
368  padding.resize(data_len, ' ');
369  write(padding);
370  }
371 }
372 
375  uint32_t data_size;
376  if (!readHeader(header) || !readDataLength(data_size))
377  throw BagFormatException("Error reading FILE_HEADER record");
378 
379  M_string& fields = *header.getValues();
380 
381  if (!isOp(fields, OP_FILE_HEADER))
382  throw BagFormatException("Expected FILE_HEADER op not found");
383 
384  // Read index position
385  readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
386 
387  if (index_data_pos_ == 0)
388  throw BagUnindexedException();
389 
390  // Read topic and chunks count
391  if (version_ >= 200) {
394  }
395 
396  CONSOLE_BRIDGE_logDebug("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
397  (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
398 
399  // Skip the data section (just padding)
400  seek(data_size, std::ios::cur);
401 }
402 
403 uint32_t Bag::getChunkOffset() const {
406  else
407  return file_.getCompressedBytesIn();
408 }
409 
411  // Initialize chunk info
414  curr_chunk_info_.end_time = time;
415 
416  // Write the chunk header, with a place-holder for the data sizes (we'll fill in when the chunk is finished)
418 
419  // Turn on compressed writing
421 
422  // Record where the data section of this chunk started
424 
425  chunk_open_ = true;
426 }
427 
429  // Add this chunk to the index
430  chunks_.push_back(curr_chunk_info_);
431 
432  // Get the uncompressed and compressed sizes
433  uint32_t uncompressed_size = getChunkOffset();
435  uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_;
436 
437  // Rewrite the chunk header with the size of the chunk (remembering current offset)
438  uint64_t end_of_chunk_pos = file_.getOffset();
439 
441  writeChunkHeader(compression_, compressed_size, uncompressed_size);
442 
443  // Write out the indexes and clear them
444  seek(end_of_chunk_pos);
447 
448  // Clear the connection counts
450 
451  // Flag that we're starting a new chunk
452  chunk_open_ = false;
453 }
454 
455 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
456  ChunkHeader chunk_header;
457  switch (compression) {
458  case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
459  case compression::BZ2: chunk_header.compression = COMPRESSION_BZ2; break;
460  case compression::LZ4: chunk_header.compression = COMPRESSION_LZ4;
461  //case compression::ZLIB: chunk_header.compression = COMPRESSION_ZLIB; break;
462  }
463  chunk_header.compressed_size = compressed_size;
464  chunk_header.uncompressed_size = uncompressed_size;
465 
466  CONSOLE_BRIDGE_logDebug("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
467  (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
468 
471  header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
472  header[SIZE_FIELD_NAME] = toHeaderString(&chunk_header.uncompressed_size);
473  writeHeader(header);
474 
475  writeDataLength(chunk_header.compressed_size);
476 }
477 
478 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
480  if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
481  throw BagFormatException("Error reading CHUNK record");
482 
483  M_string& fields = *header.getValues();
484 
485  if (!isOp(fields, OP_CHUNK))
486  throw BagFormatException("Expected CHUNK op not found");
487 
488  readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
489  readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size);
490 
491  CONSOLE_BRIDGE_logDebug("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
492 }
493 
494 // Index records
495 
497  for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
498  uint32_t connection_id = i->first;
499  multiset<IndexEntry> const& index = i->second;
500 
501  // Write the index record header
502  uint32_t index_size = index.size();
505  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
507  header[COUNT_FIELD_NAME] = toHeaderString(&index_size);
508  writeHeader(header);
509 
510  writeDataLength(index_size * 12);
511 
512  CONSOLE_BRIDGE_logDebug("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
513 
514  // Write the index record data (pairs of timestamp and position in file)
515  foreach(IndexEntry const& e, index) {
516  write((char*) &e.time.sec, 4);
517  write((char*) &e.time.nsec, 4);
518  write((char*) &e.offset, 4);
519 
520  CONSOLE_BRIDGE_logDebug(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
521  }
522  }
523 }
524 
527  uint32_t data_size;
528  if (!readHeader(header) || !readDataLength(data_size))
529  throw BagFormatException("Error reading INDEX_DATA header");
530  M_string& fields = *header.getValues();
531 
532  if (!isOp(fields, OP_INDEX_DATA))
533  throw BagFormatException("Expected INDEX_DATA record");
534 
535  uint32_t index_version;
536  string topic;
537  uint32_t count = 0;
538  readField(fields, VER_FIELD_NAME, true, &index_version);
539  readField(fields, TOPIC_FIELD_NAME, true, topic);
540  readField(fields, COUNT_FIELD_NAME, true, &count);
541 
542  CONSOLE_BRIDGE_logDebug("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
543 
544  if (index_version != 0)
545  throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
546 
547  uint32_t connection_id;
548  map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
549  if (topic_conn_id_iter == topic_connection_ids_.end()) {
550  connection_id = connections_.size();
551 
552  CONSOLE_BRIDGE_logDebug("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
553  ConnectionInfo* connection_info = new ConnectionInfo();
554  connection_info->id = connection_id;
555  connection_info->topic = topic;
556  connections_[connection_id] = connection_info;
557 
558  topic_connection_ids_[topic] = connection_id;
559  }
560  else
561  connection_id = topic_conn_id_iter->second;
562 
563  multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
564 
565  for (uint32_t i = 0; i < count; i++) {
566  IndexEntry index_entry;
567  uint32_t sec;
568  uint32_t nsec;
569  read((char*) &sec, 4);
570  read((char*) &nsec, 4);
571  read((char*) &index_entry.chunk_pos, 8); //<! store position of the message in the chunk_pos field as it's 64 bits
572  index_entry.time = Time(sec, nsec);
573  index_entry.offset = 0;
574 
575  CONSOLE_BRIDGE_logDebug(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
576 
577  if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
578  {
579  CONSOLE_BRIDGE_logError("Index entry for topic %s contains invalid time.", topic.c_str());
580  } else
581  {
582  connection_index.insert(connection_index.end(), index_entry);
583  }
584  }
585 }
586 
589  uint32_t data_size;
590  if (!readHeader(header) || !readDataLength(data_size))
591  throw BagFormatException("Error reading INDEX_DATA header");
592  M_string& fields = *header.getValues();
593 
594  if (!isOp(fields, OP_INDEX_DATA))
595  throw BagFormatException("Expected INDEX_DATA record");
596 
597  uint32_t index_version;
598  uint32_t connection_id;
599  uint32_t count = 0;
600  readField(fields, VER_FIELD_NAME, true, &index_version);
601  readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
602  readField(fields, COUNT_FIELD_NAME, true, &count);
603 
604  CONSOLE_BRIDGE_logDebug("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
605 
606  if (index_version != 1)
607  throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
608 
609  uint64_t chunk_pos = curr_chunk_info_.pos;
610 
611  multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
612 
613  for (uint32_t i = 0; i < count; i++) {
614  IndexEntry index_entry;
615  index_entry.chunk_pos = chunk_pos;
616  uint32_t sec;
617  uint32_t nsec;
618  read((char*) &sec, 4);
619  read((char*) &nsec, 4);
620  read((char*) &index_entry.offset, 4);
621  index_entry.time = Time(sec, nsec);
622 
623  CONSOLE_BRIDGE_logDebug(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
624 
625  if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
626  {
627  CONSOLE_BRIDGE_logError("Index entry for topic %s contains invalid time. This message will not be loaded.", connections_[connection_id]->topic.c_str());
628  } else
629  {
630  connection_index.insert(connection_index.end(), index_entry);
631  }
632  }
633 }
634 
635 // Connection records
636 
638  for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
639  ConnectionInfo const* connection_info = i->second;
640  writeConnectionRecord(connection_info);
641  }
642 }
643 
644 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) {
645  CONSOLE_BRIDGE_logDebug("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
646  (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
647 
650  header[TOPIC_FIELD_NAME] = connection_info->topic;
651  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
652  writeHeader(header);
653 
654  writeHeader(*connection_info->header);
655 }
656 
657 void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) {
660  header[TOPIC_FIELD_NAME] = connection_info->topic;
661  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
662  appendHeaderToBuffer(buf, header);
663 
664  appendHeaderToBuffer(buf, *connection_info->header);
665 }
666 
669  if (!readHeader(header))
670  throw BagFormatException("Error reading CONNECTION header");
671  M_string& fields = *header.getValues();
672 
673  if (!isOp(fields, OP_CONNECTION))
674  throw BagFormatException("Expected CONNECTION op not found");
675 
676  uint32_t id;
677  readField(fields, CONNECTION_FIELD_NAME, true, &id);
678  string topic;
679  readField(fields, TOPIC_FIELD_NAME, true, topic);
680 
681  ros::Header connection_header;
682  if (!readHeader(connection_header))
683  throw BagFormatException("Error reading connection header");
684 
685  // If this is a new connection, update connections
686  map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
687  if (key == connections_.end()) {
688  ConnectionInfo* connection_info = new ConnectionInfo();
689  connection_info->id = id;
690  connection_info->topic = topic;
691  connection_info->header = boost::make_shared<M_string>();
692  for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
693  (*connection_info->header)[i->first] = i->second;
694  connection_info->msg_def = (*connection_info->header)["message_definition"];
695  connection_info->datatype = (*connection_info->header)["type"];
696  connection_info->md5sum = (*connection_info->header)["md5sum"];
697  connections_[id] = connection_info;
698 
699  CONSOLE_BRIDGE_logDebug("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
700  }
701 }
702 
705  uint32_t data_size;
706  if (!readHeader(header) || !readDataLength(data_size))
707  throw BagFormatException("Error reading message definition header");
708  M_string& fields = *header.getValues();
709 
710  if (!isOp(fields, OP_MSG_DEF))
711  throw BagFormatException("Expected MSG_DEF op not found");
712 
713  string topic, md5sum, datatype, message_definition;
714  readField(fields, TOPIC_FIELD_NAME, true, topic);
715  readField(fields, MD5_FIELD_NAME, 32, 32, true, md5sum);
716  readField(fields, TYPE_FIELD_NAME, true, datatype);
717  readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition);
718 
719  ConnectionInfo* connection_info;
720 
721  map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
722  if (topic_conn_id_iter == topic_connection_ids_.end()) {
723  uint32_t id = connections_.size();
724 
725  CONSOLE_BRIDGE_logDebug("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
726  connection_info = new ConnectionInfo();
727  connection_info->id = id;
728  connection_info->topic = topic;
729 
730  connections_[id] = connection_info;
731  topic_connection_ids_[topic] = id;
732  }
733  else
734  connection_info = connections_[topic_conn_id_iter->second];
735 
736  connection_info->msg_def = message_definition;
737  connection_info->datatype = datatype;
738  connection_info->md5sum = md5sum;
739  connection_info->header = boost::make_shared<ros::M_string>();
740  (*connection_info->header)["type"] = connection_info->datatype;
741  (*connection_info->header)["md5sum"] = connection_info->md5sum;
742  (*connection_info->header)["message_definition"] = connection_info->msg_def;
743 
744  CONSOLE_BRIDGE_logDebug("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
745 }
746 
747 void Bag::decompressChunk(uint64_t chunk_pos) const {
748  if (curr_chunk_info_.pos == chunk_pos) {
750  return;
751  }
752 
754 
755  if (decompressed_chunk_ == chunk_pos)
756  return;
757 
758  // Seek to the start of the chunk
759  seek(chunk_pos);
760 
761  // Read the chunk header
762  ChunkHeader chunk_header;
763  readChunkHeader(chunk_header);
764 
765  // Read and decompress the chunk. These assume we are at the right place in the stream already
766  if (chunk_header.compression == COMPRESSION_NONE)
767  decompressRawChunk(chunk_header);
768  else if (chunk_header.compression == COMPRESSION_BZ2)
769  decompressBz2Chunk(chunk_header);
770  else if (chunk_header.compression == COMPRESSION_LZ4)
771  decompressLz4Chunk(chunk_header);
772  else
773  throw BagFormatException("Unknown compression: " + chunk_header.compression);
774 
775  decompressed_chunk_ = chunk_pos;
776 }
777 
778 void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const {
779  CONSOLE_BRIDGE_logDebug("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
780 
781  seek(offset);
782 
783  uint32_t data_size;
784  uint8_t op;
785  do {
786  if (!readHeader(header) || !readDataLength(data_size))
787  throw BagFormatException("Error reading header");
788 
789  readField(*header.getValues(), OP_FIELD_NAME, true, &op);
790  }
791  while (op == OP_MSG_DEF);
792 
793  if (op != OP_MSG_DATA)
794  throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str());
795 
796  record_buffer_.setSize(data_size);
797  file_.read((char*) record_buffer_.getData(), data_size);
798 }
799 
800 // Reading this into a buffer isn't completely necessary, but we do it anyways for now
801 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
802  assert(chunk_header.compression == COMPRESSION_NONE);
803  assert(chunk_header.compressed_size == chunk_header.uncompressed_size);
804 
805  CONSOLE_BRIDGE_logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
806 
808  file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size);
809 
810  // todo check read was successful
811 }
812 
813 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
814  assert(chunk_header.compression == COMPRESSION_BZ2);
815 
816  CompressionType compression = compression::BZ2;
817 
818  CONSOLE_BRIDGE_logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
819 
820  chunk_buffer_.setSize(chunk_header.compressed_size);
821  file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
822 
825 
826  // todo check read was successful
827 }
828 
829 void Bag::decompressLz4Chunk(ChunkHeader const& chunk_header) const {
830  assert(chunk_header.compression == COMPRESSION_LZ4);
831 
832  CompressionType compression = compression::LZ4;
833 
834  CONSOLE_BRIDGE_logDebug("lz4 compressed_size: %d uncompressed_size: %d",
835  chunk_header.compressed_size, chunk_header.uncompressed_size);
836 
837  chunk_buffer_.setSize(chunk_header.compressed_size);
838  file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
839 
842 
843  // todo check read was successful
844 }
845 
848  uint32_t data_size;
849  uint32_t bytes_read;
850  switch (version_)
851  {
852  case 200:
853  decompressChunk(index_entry.chunk_pos);
854  readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
855  return header;
856  case 102:
857  readMessageDataRecord102(index_entry.chunk_pos, header);
858  return header;
859  default:
860  throw BagFormatException((format("Unhandled version: %1%") % version_).str());
861  }
862 }
863 
864 // NOTE: this loads the header, which is unnecessary
865 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
867  uint32_t data_size;
868  uint32_t bytes_read;
869  switch (version_)
870  {
871  case 200:
872  decompressChunk(index_entry.chunk_pos);
873  readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
874  return data_size;
875  case 102:
876  readMessageDataRecord102(index_entry.chunk_pos, header);
877  return record_buffer_.getSize();
878  default:
879  throw BagFormatException((format("Unhandled version: %1%") % version_).str());
880  }
881 }
882 
884  foreach(ChunkInfo const& chunk_info, chunks_) {
885  // Write the chunk info header
887  uint32_t chunk_connection_count = chunk_info.connection_counts.size();
890  header[CHUNK_POS_FIELD_NAME] = toHeaderString(&chunk_info.pos);
891  header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
892  header[END_TIME_FIELD_NAME] = toHeaderString(&chunk_info.end_time);
893  header[COUNT_FIELD_NAME] = toHeaderString(&chunk_connection_count);
894 
895  CONSOLE_BRIDGE_logDebug("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
896  (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
897  chunk_info.start_time.sec, chunk_info.start_time.nsec,
898  chunk_info.end_time.sec, chunk_info.end_time.nsec);
899 
900  writeHeader(header);
901 
902  writeDataLength(8 * chunk_connection_count);
903 
904  // Write the topic names and counts
905  for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
906  uint32_t connection_id = i->first;
907  uint32_t count = i->second;
908 
909  write((char*) &connection_id, 4);
910  write((char*) &count, 4);
911 
912  CONSOLE_BRIDGE_logDebug(" - %d: %d", connection_id, count);
913  }
914  }
915 }
916 
918  // Read a CHUNK_INFO header
920  uint32_t data_size;
921  if (!readHeader(header) || !readDataLength(data_size))
922  throw BagFormatException("Error reading CHUNK_INFO record header");
923  M_string& fields = *header.getValues();
924  if (!isOp(fields, OP_CHUNK_INFO))
925  throw BagFormatException("Expected CHUNK_INFO op not found");
926 
927  // Check that the chunk info version is current
928  uint32_t chunk_info_version;
929  readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
930  if (chunk_info_version != CHUNK_INFO_VERSION)
931  throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str());
932 
933  // Read the chunk position, timestamp, and topic count fields
934  ChunkInfo chunk_info;
935  readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos);
936  readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time);
937  readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time);
938  uint32_t chunk_connection_count = 0;
939  readField(fields, COUNT_FIELD_NAME, true, &chunk_connection_count);
940 
941  CONSOLE_BRIDGE_logDebug("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
942  (unsigned long long) chunk_info.pos, chunk_connection_count,
943  chunk_info.start_time.sec, chunk_info.start_time.nsec,
944  chunk_info.end_time.sec, chunk_info.end_time.nsec);
945 
946  // Read the topic count entries
947  for (uint32_t i = 0; i < chunk_connection_count; i ++) {
948  uint32_t connection_id, connection_count;
949  read((char*) &connection_id, 4);
950  read((char*) &connection_count, 4);
951 
952  CONSOLE_BRIDGE_logDebug(" %d: %d messages", connection_id, connection_count);
953 
954  chunk_info.connection_counts[connection_id] = connection_count;
955  }
956 
957  chunks_.push_back(chunk_info);
958 }
959 
960 // Record I/O
961 
962 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
963  uint8_t op = 0xFF; // nonexistent op
964  readField(fields, OP_FIELD_NAME, true, &op);
965  return op == reqOp;
966 }
967 
968 void Bag::writeHeader(M_string const& fields) {
969  boost::shared_array<uint8_t> header_buffer;
970  uint32_t header_len;
971  ros::Header::write(fields, header_buffer, header_len);
972  write((char*) &header_len, 4);
973  write((char*) header_buffer.get(), header_len);
974 }
975 
976 void Bag::writeDataLength(uint32_t data_len) {
977  write((char*) &data_len, 4);
978 }
979 
980 void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) {
981  boost::shared_array<uint8_t> header_buffer;
982  uint32_t header_len;
983  ros::Header::write(fields, header_buffer, header_len);
984 
985  uint32_t offset = buf.getSize();
986 
987  buf.setSize(buf.getSize() + 4 + header_len);
988 
989  memcpy(buf.getData() + offset, &header_len, 4);
990  offset += 4;
991  memcpy(buf.getData() + offset, header_buffer.get(), header_len);
992 }
993 
994 void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) {
995  uint32_t offset = buf.getSize();
996 
997  buf.setSize(buf.getSize() + 4);
998 
999  memcpy(buf.getData() + offset, &data_len, 4);
1000 }
1001 
1003 void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const {
1004  assert(buffer.getSize() > 8);
1005 
1006  uint8_t* start = (uint8_t*) buffer.getData() + offset;
1007 
1008  uint8_t* ptr = start;
1009 
1010  // Read the header length
1011  uint32_t header_len;
1012  memcpy(&header_len, ptr, 4);
1013  ptr += 4;
1014 
1015  // Parse the header
1016  string error_msg;
1017  bool parsed = header.parse(ptr, header_len, error_msg);
1018  if (!parsed)
1019  throw BagFormatException("Error parsing header");
1020  ptr += header_len;
1021 
1022  // Read the data size
1023  memcpy(&data_size, ptr, 4);
1024  ptr += 4;
1025 
1026  bytes_read = ptr - start;
1027 }
1028 
1029 void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) const {
1030  (void)buffer;
1031  total_bytes_read = 0;
1032  uint8_t op = 0xFF;
1033  do {
1034  CONSOLE_BRIDGE_logDebug("reading header from buffer: offset=%d", offset);
1035  uint32_t bytes_read;
1036  readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
1037 
1038  offset += bytes_read;
1039  total_bytes_read += bytes_read;
1040 
1041  readField(*header.getValues(), OP_FIELD_NAME, true, &op);
1042  }
1043  while (op == OP_MSG_DEF || op == OP_CONNECTION);
1044 
1045  if (op != OP_MSG_DATA)
1046  throw BagFormatException("Expected MSG_DATA op not found");
1047 }
1048 
1049 bool Bag::readHeader(ros::Header& header) const {
1050  // Read the header length
1051  uint32_t header_len;
1052  read((char*) &header_len, 4);
1053 
1054  // Read the header
1055  header_buffer_.setSize(header_len);
1056  read((char*) header_buffer_.getData(), header_len);
1057 
1058  // Parse the header
1059  string error_msg;
1060  bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
1061  if (!parsed)
1062  return false;
1063 
1064  return true;
1065 }
1066 
1067 bool Bag::readDataLength(uint32_t& data_size) const {
1068  read((char*) &data_size, 4);
1069  return true;
1070 }
1071 
1072 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
1073  M_string::const_iterator fitr = fields.find(field);
1074  if (fitr == fields.end()) {
1075  if (required)
1076  throw BagFormatException("Required '" + field + "' field missing");
1077  }
1078  else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
1079  throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
1080 
1081  return fitr;
1082 }
1083 
1084 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
1085  return readField(fields, field_name, 1, UINT_MAX, required, data);
1086 }
1087 
1088 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
1089  M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
1090  if (fitr == fields.end())
1091  return false;
1092 
1093  data = fitr->second;
1094  return true;
1095 }
1096 
1097 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
1098  uint64_t packed_time;
1099  if (!readField(fields, field_name, required, &packed_time))
1100  return false;
1101 
1102  uint64_t bitmask = (1LL << 33) - 1;
1103  data.sec = (uint32_t) (packed_time & bitmask);
1104  data.nsec = (uint32_t) (packed_time >> 32);
1105 
1106  return true;
1107 }
1108 
1109 std::string Bag::toHeaderString(Time const* field) const {
1110  uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
1111  return toHeaderString(&packed_time);
1112 }
1113 
1114 
1115 // Low-level I/O
1116 
1117 void Bag::write(string const& s) { write(s.c_str(), s.length()); }
1118 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n); }
1119 
1120 void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); }
1121 void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); }
1122 
1123 void Bag::swap(Bag& other) {
1124  using std::swap;
1125  swap(mode_, other.mode_);
1126  swap(file_, other.file_);
1127  swap(version_, other.version_);
1128  swap(compression_, other.compression_);
1131  swap(file_size_, other.file_size_);
1135  swap(chunk_count_, other.chunk_count_);
1136  swap(chunk_open_, other.chunk_open_);
1141  swap(connections_, other.connections_);
1142  swap(chunks_, other.chunks_);
1152 }
1153 
1154 bool Bag::isOpen() const { return file_.isOpen(); }
1155 
1156 } // namespace rosbag
1157 
BagMode getMode() const
Get the mode the bag is in.
Definition: bag.cpp:186
uint32_t readMessageDataSize(IndexEntry const &index_entry) const
Definition: bag.cpp:865
static const std::string COMPRESSION_BZ2
Definition: constants.h:96
void openRead(std::string const &filename)
open file for reading
static const std::string END_TIME_FIELD_NAME
Definition: constants.h:62
void startWritingChunk(ros::Time time)
Definition: bag.cpp:410
#define CONSOLE_BRIDGE_logDebug(fmt,...)
static const std::string CHUNK_COUNT_FIELD_NAME
Definition: constants.h:56
BagMode mode_
Definition: bag.h:283
void read(void *ptr, size_t size)
read size bytes from the file into ptr
static const std::string COMPRESSION_FIELD_NAME
Definition: constants.h:58
std::map< std::string, uint32_t > topic_connection_ids_
Definition: bag.h:301
void decompressChunk(uint64_t chunk_pos) const
Definition: bag.cpp:747
uint64_t pos
absolute byte offset of chunk record in bag file
Definition: structures.h:64
void openRead(std::string const &filename)
Definition: bag.cpp:121
static const std::string VER_FIELD_NAME
Definition: constants.h:52
ros::M_string::const_iterator checkField(ros::M_string const &fields, std::string const &field, unsigned int min_len, unsigned int max_len, bool required) const
Definition: bag.cpp:1072
static const std::string COMPRESSION_LZ4
Definition: constants.h:97
bool isOpen() const
Definition: bag.cpp:1154
void open(std::string const &filename, uint32_t mode=bagmode::Read)
Open a bag file.
Definition: bag.cpp:102
uint32_t getChunkThreshold() const
Get the threshold for creating new chunks.
Definition: bag.cpp:189
std::string getFileName() const
Get the filename of the bag.
Definition: bag.cpp:185
std::map< uint32_t, ConnectionInfo * > connections_
Definition: bag.h:303
std::string compression
chunk compression type, e.g. "none" or "bz2" (see constants.h)
Definition: structures.h:71
int version_
Definition: bag.h:285
void swap(Bag &)
Definition: bag.cpp:1123
static const std::string CONNECTION_FIELD_NAME
Definition: constants.h:57
void startReadingVersion102()
Definition: bag.cpp:305
#define CONSOLE_BRIDGE_logError(fmt,...)
void write(std::string const &s)
uint8_t * getData()
Definition: buffer.cpp:52
uint64_t file_size_
Definition: bag.h:290
std::map< ros::M_string, uint32_t > header_connection_ids_
Definition: bag.h:302
static const std::string INDEX_POS_FIELD_NAME
Definition: constants.h:54
void setChunkThreshold(uint32_t chunk_threshold)
Set the threshold for creating new chunks.
Definition: bag.cpp:191
ros::Time time
timestamp of the message
Definition: structures.h:78
void readTopicIndexRecord102()
Definition: bag.cpp:525
uint64_t getOffset() const
return current offset from the beginning of the file
std::map< uint32_t, uint32_t > connection_counts
number of messages in each connection stored in the chunk
Definition: structures.h:66
void readMessageDataHeaderFromBuffer(Buffer &buffer, uint32_t offset, ros::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
Definition: bag.cpp:1029
CompressionType getCompression() const
Get the compression method to use for writing chunks.
Definition: bag.cpp:198
Buffer header_buffer_
reusable buffer in which to assemble the record header before writing to file
Definition: bag.h:310
static const unsigned char OP_CONNECTION
Definition: constants.h:80
void openWrite(std::string const &filename)
Definition: bag.cpp:134
void seek(uint64_t pos, int origin=std::ios_base::beg) const
Definition: bag.cpp:1121
Base class for rosbag exceptions.
Definition: exceptions.h:43
bool truncate(uint64_t length)
void seek(uint64_t offset, int origin=std::ios_base::beg)
seek to given offset from origin
void writeConnectionRecords()
Definition: bag.cpp:637
uint32_t getChunkOffset() const
Definition: bag.cpp:403
static const std::string DEF_FIELD_NAME
Definition: constants.h:68
uint32_t getMajorVersion() const
Get the major-version of the open bag file.
Definition: bag.cpp:245
void readChunkHeader(ChunkHeader &chunk_header) const
Definition: bag.cpp:478
void writeConnectionRecord(ConnectionInfo const *connection_info)
Definition: bag.cpp:644
std_msgs::Header * header(M &m)
void openAppend(std::string const &filename)
Definition: bag.cpp:140
void close()
close the file
void swap(Bag &a, Bag &b)
Definition: bag.h:634
void appendDataLengthToBuffer(Buffer &buf, uint32_t data_len)
Definition: bag.cpp:994
Exception thrown on problems reading the bag format.
Definition: exceptions.h:57
uint32_t getCompressedBytesIn() const
return the number of bytes written to current compressed stream
BagMode
The possible modes to open a bag in.
Definition: bag.h:73
std::string md5sum
Definition: structures.h:54
void setWriteMode(CompressionType type)
static const unsigned char OP_MSG_DATA
Definition: constants.h:75
void close()
Close the bag file.
Definition: bag.cpp:162
static const unsigned char OP_CHUNK
Definition: constants.h:78
bool readField(ros::M_string const &fields, std::string const &field_name, bool required, T *data) const
Definition: bag.h:357
static const std::string VERSION
Definition: constants.h:44
Buffer decompress_buffer_
reusable buffer to decompress chunks into
Definition: bag.h:314
void appendConnectionRecordToBuffer(Buffer &buf, ConnectionInfo const *connection_info)
Definition: bag.cpp:657
uint32_t compressed_size
compressed size of the chunk in bytes
Definition: structures.h:72
void decompressRawChunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:801
std::map< std::string, std::string > M_string
void readMessageDefinitionRecord102()
Definition: bag.cpp:703
void writeFileHeaderRecord()
Definition: bag.cpp:341
void readVersion()
Definition: bag.cpp:226
std::string toHeaderString(T const *field) const
Definition: bag.h:352
static const std::string CHUNK_POS_FIELD_NAME
Definition: constants.h:63
uint64_t decompressed_chunk_
position of decompressed chunk
Definition: bag.h:320
bool readHeader(ros::Header &header) const
Definition: bag.cpp:1049
void readHeaderFromBuffer(Buffer &buffer, uint32_t offset, ros::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
Definition: bag.cpp:1003
const char * datatype()
uint64_t index_data_pos_
Definition: bag.h:292
~Bag()
Definition: bag.cpp:98
void writeHeader(ros::M_string const &fields)
Definition: bag.cpp:968
static const std::string START_TIME_FIELD_NAME
Definition: constants.h:61
Exception thrown on problems reading the bag index.
Definition: exceptions.h:64
static const uint32_t CHUNK_INFO_VERSION
Definition: constants.h:92
void decompressBz2Chunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:813
Buffer * current_buffer_
Definition: bag.h:318
Exception thrown when on IO problems.
Definition: exceptions.h:50
bool parse(const boost::shared_array< uint8_t > &buffer, uint32_t size, std::string &error_msg)
void appendHeaderToBuffer(Buffer &buf, ros::M_string const &fields)
Definition: bag.cpp:980
bool isOp(ros::M_string &fields, uint8_t reqOp) const
Definition: bag.cpp:962
void setCompression(CompressionType compression)
Set the compression method to use for writing chunks.
Definition: bag.cpp:200
void decompressLz4Chunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:829
static void write(const M_string &key_vals, boost::shared_array< uint8_t > &buffer, uint32_t &size)
std::string msg_def
Definition: structures.h:55
void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size)
Definition: bag.cpp:455
static const std::string OP_FIELD_NAME
Definition: constants.h:50
std::string getline()
uint64_t file_header_pos_
Definition: bag.h:291
bool isOpen() const
return true if file is open for reading or writing
uint32_t bag_revision_
Definition: bag.h:288
Buffer record_buffer_
reusable buffer in which to assemble the record data before writing to file
Definition: bag.h:311
void writeVersion()
Definition: bag.cpp:216
void stopWritingChunk()
Definition: bag.cpp:428
static const std::string TYPE_FIELD_NAME
Definition: constants.h:67
ROSTIME_DECL const Time TIME_MAX
uint32_t getSize() const
Definition: buffer.cpp:54
Buffer chunk_buffer_
reusable buffer to read chunk into
Definition: bag.h:313
ChunkInfo curr_chunk_info_
Definition: bag.h:298
uint32_t getMinorVersion() const
Get the minor-version of the open bag file.
Definition: bag.cpp:246
static const std::string COUNT_FIELD_NAME
Definition: constants.h:53
std::map< uint32_t, std::multiset< IndexEntry > > connection_indexes_
Definition: bag.h:307
uint64_t chunk_pos
absolute byte offset of the chunk record containing the message
Definition: structures.h:79
static const unsigned char OP_FILE_HEADER
Definition: constants.h:76
static const unsigned char OP_INDEX_DATA
Definition: constants.h:77
uint32_t connection_count_
Definition: bag.h:293
void readConnectionIndexRecord200()
Definition: bag.cpp:587
void readChunkInfoRecord()
Definition: bag.cpp:917
void openWrite(std::string const &filename)
open file for writing
void stopWriting()
Definition: bag.cpp:256
M_stringPtr getValues()
uint32_t offset
relative byte offset of the message record (either definition or data) in the chunk ...
Definition: structures.h:80
bool readDataLength(uint32_t &data_size) const
Definition: bag.cpp:1067
uint32_t chunk_threshold_
Definition: bag.h:287
std::vector< ChunkInfo > chunks_
Definition: bag.h:305
std::string datatype
Definition: structures.h:53
ROSTIME_DECL const Time TIME_MIN
boost::shared_ptr< ros::M_string > header
Definition: structures.h:57
void startReadingVersion200()
Definition: bag.cpp:270
uint32_t chunk_count_
Definition: bag.h:294
bagmode::BagMode BagMode
Definition: bag.h:80
void closeWrite()
Definition: bag.cpp:181
Definition: bag.h:68
void writeDataLength(uint32_t data_len)
Definition: bag.cpp:976
CompressionType compression_
Definition: bag.h:286
void decompress(CompressionType compression, uint8_t *dest, unsigned int dest_len, uint8_t *source, unsigned int source_len)
std::map< uint32_t, std::multiset< IndexEntry > > curr_chunk_connection_indexes_
Definition: bag.h:308
void read(char *b, std::streamsize n) const
Definition: bag.cpp:1120
uint32_t uncompressed_size
uncompressed size of the chunk in bytes
Definition: structures.h:73
static const std::string CONNECTION_COUNT_FIELD_NAME
Definition: constants.h:55
static const uint32_t FILE_HEADER_LENGTH
Definition: constants.h:86
uint64_t curr_chunk_data_pos_
Definition: bag.h:299
static const std::string MD5_FIELD_NAME
Definition: constants.h:66
Buffer outgoing_chunk_buffer_
reusable buffer to read chunk into
Definition: bag.h:316
static const unsigned char OP_CHUNK_INFO
Definition: constants.h:79
void readMessageDataRecord102(uint64_t offset, ros::Header &header) const
Definition: bag.cpp:778
const char * md5sum()
static const std::string SIZE_FIELD_NAME
Definition: constants.h:59
void readConnectionRecord()
Definition: bag.cpp:667
static const std::string COMPRESSION_NONE
Definition: constants.h:95
ros::Time end_time
latest timestamp of a message in the chunk
Definition: structures.h:63
uint64_t getSize() const
Get the current size of the bag file (a lower bound)
Definition: bag.cpp:187
void writeIndexRecords()
Definition: bag.cpp:496
void startWriting()
Definition: bag.cpp:250
void writeChunkInfoRecords()
Definition: bag.cpp:883
std::string getFileName() const
return path of currently open file
ChunkedFile file_
Definition: bag.h:284
static const std::string TOPIC_FIELD_NAME
Definition: constants.h:51
void openReadWrite(std::string const &filename)
open file for reading & writing
static const unsigned char OP_MSG_DEF
Definition: constants.h:83
void setSize(uint32_t size)
Definition: buffer.cpp:56
void write(std::string const &topic, ros::MessageEvent< T > const &event)
Write a message into the bag file.
Definition: bag.h:332
void readFileHeaderRecord()
Definition: bag.cpp:373
static const uint32_t INDEX_VERSION
Definition: constants.h:89
ros::Header readMessageDataHeader(IndexEntry const &index_entry)
Definition: bag.cpp:846
bool chunk_open_
Definition: bag.h:297
ros::Time start_time
earliest timestamp of a message in the chunk
Definition: structures.h:62


rosbag_storage
Author(s): Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:19