bag.cpp
Go to the documentation of this file.
1 // Copyright (c) 2009, Willow Garage, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 //
7 // * Redistributions of source code must retain the above copyright
8 // notice, this list of conditions and the following disclaimer.
9 // * Redistributions in binary form must reproduce the above copyright
10 // notice, this list of conditions and the following disclaimer in the
11 // documentation and/or other materials provided with the distribution.
12 // * Neither the name of Willow Garage, Inc. nor the names of its
13 // contributors may be used to endorse or promote products derived from
14 // this software without specific prior written permission.
15 //
16 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 // POSSIBILITY OF SUCH DAMAGE.
27 
28 #include "rosbag/bag.h"
30 #include "rosbag/query.h"
31 #include "rosbag/view.h"
32 
33 #if defined(_MSC_VER)
34  #include <stdint.h> // only on v2010 and later -> is this enough for msvc and linux?
35 #else
36  #include <inttypes.h>
37 #endif
38 #include <signal.h>
39 #include <assert.h>
40 #include <iomanip>
41 
42 #include <boost/foreach.hpp>
43 
44 #include "console_bridge/console.h"
45 
46 #define foreach BOOST_FOREACH
47 
48 using std::map;
49 using std::priority_queue;
50 using std::string;
51 using std::vector;
52 using std::multiset;
53 using boost::format;
54 using boost::shared_ptr;
55 using ros::M_string;
56 using ros::Time;
57 
58 namespace rosbag {
59 
61 {
62  init();
63 }
64 
65 Bag::Bag(string const& filename, uint32_t mode)
66 {
67  init();
68  open(filename, mode);
69 }
70 
71 #ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
72 
73 Bag::Bag(Bag&& other) {
74  init();
75  swap(other);
76 }
77 
78 Bag& Bag::operator=(Bag&& other) {
79  swap(other);
80  return *this;
81 }
82 
83 #endif // BOOST_NO_CXX11_RVALUE_REFERENCES
84 
86  close();
87 }
88 
89 void Bag::init() {
91  version_ = 0;
93  chunk_threshold_ = 768 * 1024; // 768KB chunks
94  bag_revision_ = 0;
95  file_size_ = 0;
96  file_header_pos_ = 0;
97  index_data_pos_ = 0;
99  chunk_count_ = 0;
100  chunk_open_ = false;
102  current_buffer_ = 0;
104 }
105 
106 void Bag::open(string const& filename, uint32_t mode) {
107  mode_ = (BagMode) mode;
108 
109  if (mode_ & bagmode::Append)
110  openAppend(filename);
111  else if (mode_ & bagmode::Write)
112  openWrite(filename);
113  else if (mode_ & bagmode::Read)
114  openRead(filename);
115  else
116  throw BagException((format("Unknown mode: %1%") % (int) mode).str());
117 
118  // Determine file size
119  uint64_t offset = file_.getOffset();
120  seek(0, std::ios::end);
122  seek(offset);
123 }
124 
125 void Bag::openRead(string const& filename) {
126  file_.openRead(filename);
127 
128  readVersion();
129 
130  switch (version_) {
131  case 102: startReadingVersion102(); break;
132  case 200: startReadingVersion200(); break;
133  default:
134  throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str());
135  }
136 }
137 
138 void Bag::openWrite(string const& filename) {
139  file_.openWrite(filename);
140 
141  startWriting();
142 }
143 
144 void Bag::openAppend(string const& filename) {
145  file_.openReadWrite(filename);
146 
147  readVersion();
148 
149  if (version_ != 200)
150  throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str());
151 
153 
154  // Truncate the file to chop off the index
156  index_data_pos_ = 0;
157 
158  // Rewrite the file header, clearing the index position (so we know if the index is invalid)
161 
162  // Seek to the end of the file
163  seek(0, std::ios::end);
164 }
165 
166 void Bag::close() {
167  if (!isOpen())
168  return;
169 
171  closeWrite();
172 
173  file_.close();
174 
175  topic_connection_ids_.clear();
176  header_connection_ids_.clear();
177  for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
178  delete i->second;
179  connections_.clear();
180  chunks_.clear();
181  connection_indexes_.clear();
183 
184  init();
185 }
186 
188  stopWriting();
189 }
190 
191 string Bag::getFileName() const { return file_.getFileName(); }
192 BagMode Bag::getMode() const { return mode_; }
193 uint64_t Bag::getSize() const { return file_size_; }
194 
195 uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; }
196 
197 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
198  if (isOpen() && chunk_open_)
200 
201  chunk_threshold_ = chunk_threshold;
202 }
203 
205 
207  if (isOpen() && chunk_open_)
209 
210  if (!(compression == compression::Uncompressed ||
211  compression == compression::BZ2 ||
212  compression == compression::LZ4)) {
213  throw BagException(
214  (format("Unknown compression type: %i") % compression).str());
215  }
216 
217  compression_ = compression;
218 }
219 
220 // Version
221 
223  string version = string("#ROSBAG V") + VERSION + string("\n");
224 
225  CONSOLE_BRIDGE_logDebug("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
226 
227  version_ = 200;
228 
229  write(version);
230 }
231 
233  string version_line = file_.getline();
234 
236 
237  char logtypename[100];
238  int version_major, version_minor;
239 #if defined(_MSC_VER)
240  if (sscanf_s(version_line.c_str(), "#ROS%s V%d.%d", logtypename, sizeof(logtypename), &version_major, &version_minor) != 3)
241 #else
242  if (sscanf(version_line.c_str(), "#ROS%99s V%d.%d", logtypename, &version_major, &version_minor) != 3)
243 #endif
244  throw BagIOException("Error reading version line");
245 
246  version_ = version_major * 100 + version_minor;
247 
248  CONSOLE_BRIDGE_logDebug("Read VERSION: version=%d", version_);
249 }
250 
251 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
252 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
253 
254 //
255 
257  writeVersion();
260 }
261 
263  if (chunk_open_)
265 
266  seek(0, std::ios::end);
267 
271 
274 }
275 
277  // Read the file header record, which points to the end of the chunks
279 
280  // Seek to the end of the chunks
282 
283  // Read the connection records (one for each connection)
284  for (uint32_t i = 0; i < connection_count_; i++)
286 
287  // Read the chunk info records
288  for (uint32_t i = 0; i < chunk_count_; i++)
290 
291  // Read the connection indexes for each chunk
292  foreach(ChunkInfo const& chunk_info, chunks_) {
293  curr_chunk_info_ = chunk_info;
294 
296 
297  // Skip over the chunk data
298  ChunkHeader chunk_header;
299  readChunkHeader(chunk_header);
300  seek(chunk_header.compressed_size, std::ios::cur);
301 
302  // Read the index records after the chunk
303  for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
305  }
306 
307  // At this point we don't have a curr_chunk_info anymore so we reset it
309 }
310 
312  try
313  {
314  // Read the file header record, which points to the start of the topic indexes
316  }
317  catch (BagFormatException ex) {
318  throw BagUnindexedException();
319  }
320 
321  // Get the length of the file
322  seek(0, std::ios::end);
323  uint64_t filelength = file_.getOffset();
324 
325  // Seek to the beginning of the topic index records
327 
328  // Read the topic index records, which point to the offsets of each message in the file
329  while (file_.getOffset() < filelength)
331 
332  // Read the message definition records (which are the first entry in the topic indexes)
333  for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
334  multiset<IndexEntry> const& index = i->second;
335  IndexEntry const& first_entry = *index.begin();
336 
337  CONSOLE_BRIDGE_logDebug("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
338 
339  seek(first_entry.chunk_pos);
340 
342  }
343 }
344 
345 // File header record
346 
349  chunk_count_ = chunks_.size();
350 
351  CONSOLE_BRIDGE_logDebug("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
352  (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
353 
354  // Write file header record
360 
361  boost::shared_array<uint8_t> header_buffer;
362  uint32_t header_len;
363  ros::Header::write(header, header_buffer, header_len);
364  uint32_t data_len = 0;
365  if (header_len < FILE_HEADER_LENGTH)
366  data_len = FILE_HEADER_LENGTH - header_len;
367  write((char*) &header_len, 4);
368  write((char*) header_buffer.get(), header_len);
369  write((char*) &data_len, 4);
370 
371  // Pad the file header record out
372  if (data_len > 0) {
373  string padding;
374  padding.resize(data_len, ' ');
375  write(padding);
376  }
377 }
378 
381  uint32_t data_size;
382  if (!readHeader(header) || !readDataLength(data_size))
383  throw BagFormatException("Error reading FILE_HEADER record");
384 
385  M_string& fields = *header.getValues();
386 
387  if (!isOp(fields, OP_FILE_HEADER))
388  throw BagFormatException("Expected FILE_HEADER op not found");
389 
390  // Read index position
391  readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
392 
393  if (index_data_pos_ == 0)
394  throw BagUnindexedException();
395 
396  // Read topic and chunks count
397  if (version_ >= 200) {
400  }
401 
402  CONSOLE_BRIDGE_logDebug("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
403  (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
404 
405  // Skip the data section (just padding)
406  seek(data_size, std::ios::cur);
407 }
408 
409 uint32_t Bag::getChunkOffset() const {
412  else
413  return file_.getCompressedBytesIn();
414 }
415 
417  // Initialize chunk info
420  curr_chunk_info_.end_time = time;
421 
422  // Write the chunk header, with a place-holder for the data sizes (we'll fill in when the chunk is finished)
424 
425  // Turn on compressed writing
427 
428  // Record where the data section of this chunk started
430 
431  chunk_open_ = true;
432 }
433 
435  // Add this chunk to the index
436  chunks_.push_back(curr_chunk_info_);
437 
438  // Get the uncompressed and compressed sizes
439  uint32_t uncompressed_size = getChunkOffset();
441  uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_;
442 
443  // Rewrite the chunk header with the size of the chunk (remembering current offset)
444  uint64_t end_of_chunk_pos = file_.getOffset();
445 
447  writeChunkHeader(compression_, compressed_size, uncompressed_size);
448 
449  // Write out the indexes and clear them
450  seek(end_of_chunk_pos);
453 
454  // Clear the connection counts
456 
457  // Flag that we're starting a new chunk
458  chunk_open_ = false;
459 }
460 
461 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
462  ChunkHeader chunk_header;
463  switch (compression) {
464  case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
465  case compression::BZ2: chunk_header.compression = COMPRESSION_BZ2; break;
466  case compression::LZ4: chunk_header.compression = COMPRESSION_LZ4;
467  //case compression::ZLIB: chunk_header.compression = COMPRESSION_ZLIB; break;
468  }
469  chunk_header.compressed_size = compressed_size;
470  chunk_header.uncompressed_size = uncompressed_size;
471 
472  CONSOLE_BRIDGE_logDebug("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
473  (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
474 
477  header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
478  header[SIZE_FIELD_NAME] = toHeaderString(&chunk_header.uncompressed_size);
479  writeHeader(header);
480 
481  writeDataLength(chunk_header.compressed_size);
482 }
483 
484 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
486  if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
487  throw BagFormatException("Error reading CHUNK record");
488 
489  M_string& fields = *header.getValues();
490 
491  if (!isOp(fields, OP_CHUNK))
492  throw BagFormatException("Expected CHUNK op not found");
493 
494  readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
495  readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size);
496 
497  CONSOLE_BRIDGE_logDebug("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
498 }
499 
500 // Index records
501 
503  for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
504  uint32_t connection_id = i->first;
505  multiset<IndexEntry> const& index = i->second;
506 
507  // Write the index record header
508  uint32_t index_size = index.size();
511  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
513  header[COUNT_FIELD_NAME] = toHeaderString(&index_size);
514  writeHeader(header);
515 
516  writeDataLength(index_size * 12);
517 
518  CONSOLE_BRIDGE_logDebug("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
519 
520  // Write the index record data (pairs of timestamp and position in file)
521  foreach(IndexEntry const& e, index) {
522  write((char*) &e.time.sec, 4);
523  write((char*) &e.time.nsec, 4);
524  write((char*) &e.offset, 4);
525 
526  CONSOLE_BRIDGE_logDebug(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
527  }
528  }
529 }
530 
533  uint32_t data_size;
534  if (!readHeader(header) || !readDataLength(data_size))
535  throw BagFormatException("Error reading INDEX_DATA header");
536  M_string& fields = *header.getValues();
537 
538  if (!isOp(fields, OP_INDEX_DATA))
539  throw BagFormatException("Expected INDEX_DATA record");
540 
541  uint32_t index_version;
542  string topic;
543  uint32_t count = 0;
544  readField(fields, VER_FIELD_NAME, true, &index_version);
545  readField(fields, TOPIC_FIELD_NAME, true, topic);
546  readField(fields, COUNT_FIELD_NAME, true, &count);
547 
548  CONSOLE_BRIDGE_logDebug("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
549 
550  if (index_version != 0)
551  throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
552 
553  uint32_t connection_id;
554  map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
555  if (topic_conn_id_iter == topic_connection_ids_.end()) {
556  connection_id = connections_.size();
557 
558  CONSOLE_BRIDGE_logDebug("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
559  ConnectionInfo* connection_info = new ConnectionInfo();
560  connection_info->id = connection_id;
561  connection_info->topic = topic;
562  connections_[connection_id] = connection_info;
563 
564  topic_connection_ids_[topic] = connection_id;
565  }
566  else
567  connection_id = topic_conn_id_iter->second;
568 
569  multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
570 
571  for (uint32_t i = 0; i < count; i++) {
572  IndexEntry index_entry;
573  uint32_t sec;
574  uint32_t nsec;
575  read((char*) &sec, 4);
576  read((char*) &nsec, 4);
577  read((char*) &index_entry.chunk_pos, 8); //<! store position of the message in the chunk_pos field as it's 64 bits
578  index_entry.time = Time(sec, nsec);
579  index_entry.offset = 0;
580 
581  CONSOLE_BRIDGE_logDebug(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
582 
583  if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
584  {
585  CONSOLE_BRIDGE_logError("Index entry for topic %s contains invalid time.", topic.c_str());
586  } else
587  {
588  connection_index.insert(connection_index.end(), index_entry);
589  }
590  }
591 }
592 
595  uint32_t data_size;
596  if (!readHeader(header) || !readDataLength(data_size))
597  throw BagFormatException("Error reading INDEX_DATA header");
598  M_string& fields = *header.getValues();
599 
600  if (!isOp(fields, OP_INDEX_DATA))
601  throw BagFormatException("Expected INDEX_DATA record");
602 
603  uint32_t index_version;
604  uint32_t connection_id;
605  uint32_t count = 0;
606  readField(fields, VER_FIELD_NAME, true, &index_version);
607  readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
608  readField(fields, COUNT_FIELD_NAME, true, &count);
609 
610  CONSOLE_BRIDGE_logDebug("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
611 
612  if (index_version != 1)
613  throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
614 
615  uint64_t chunk_pos = curr_chunk_info_.pos;
616 
617  multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
618 
619  for (uint32_t i = 0; i < count; i++) {
620  IndexEntry index_entry;
621  index_entry.chunk_pos = chunk_pos;
622  uint32_t sec;
623  uint32_t nsec;
624  read((char*) &sec, 4);
625  read((char*) &nsec, 4);
626  read((char*) &index_entry.offset, 4);
627  index_entry.time = Time(sec, nsec);
628 
629  CONSOLE_BRIDGE_logDebug(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
630 
631  if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
632  {
633  CONSOLE_BRIDGE_logError("Index entry for topic %s contains invalid time. This message will not be loaded.", connections_[connection_id]->topic.c_str());
634  } else
635  {
636  connection_index.insert(connection_index.end(), index_entry);
637  }
638  }
639 }
640 
641 // Connection records
642 
644  for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
645  ConnectionInfo const* connection_info = i->second;
646  writeConnectionRecord(connection_info);
647  }
648 }
649 
650 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) {
651  CONSOLE_BRIDGE_logDebug("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
652  (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
653 
656  header[TOPIC_FIELD_NAME] = connection_info->topic;
657  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
658  writeHeader(header);
659 
660  writeHeader(*connection_info->header);
661 }
662 
663 void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) {
666  header[TOPIC_FIELD_NAME] = connection_info->topic;
667  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
668  appendHeaderToBuffer(buf, header);
669 
670  appendHeaderToBuffer(buf, *connection_info->header);
671 }
672 
675  if (!readHeader(header))
676  throw BagFormatException("Error reading CONNECTION header");
677  M_string& fields = *header.getValues();
678 
679  if (!isOp(fields, OP_CONNECTION))
680  throw BagFormatException("Expected CONNECTION op not found");
681 
682  uint32_t id;
683  readField(fields, CONNECTION_FIELD_NAME, true, &id);
684  string topic;
685  readField(fields, TOPIC_FIELD_NAME, true, topic);
686 
687  ros::Header connection_header;
688  if (!readHeader(connection_header))
689  throw BagFormatException("Error reading connection header");
690 
691  // If this is a new connection, update connections
692  map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
693  if (key == connections_.end()) {
694  ConnectionInfo* connection_info = new ConnectionInfo();
695  connection_info->id = id;
696  connection_info->topic = topic;
697  connection_info->header = boost::make_shared<M_string>();
698  for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
699  (*connection_info->header)[i->first] = i->second;
700  connection_info->msg_def = (*connection_info->header)["message_definition"];
701  connection_info->datatype = (*connection_info->header)["type"];
702  connection_info->md5sum = (*connection_info->header)["md5sum"];
703  connections_[id] = connection_info;
704 
705  CONSOLE_BRIDGE_logDebug("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
706  }
707 }
708 
711  uint32_t data_size;
712  if (!readHeader(header) || !readDataLength(data_size))
713  throw BagFormatException("Error reading message definition header");
714  M_string& fields = *header.getValues();
715 
716  if (!isOp(fields, OP_MSG_DEF))
717  throw BagFormatException("Expected MSG_DEF op not found");
718 
719  string topic, md5sum, datatype, message_definition;
720  readField(fields, TOPIC_FIELD_NAME, true, topic);
721  readField(fields, MD5_FIELD_NAME, 32, 32, true, md5sum);
722  readField(fields, TYPE_FIELD_NAME, true, datatype);
723  readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition);
724 
725  ConnectionInfo* connection_info;
726 
727  map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
728  if (topic_conn_id_iter == topic_connection_ids_.end()) {
729  uint32_t id = connections_.size();
730 
731  CONSOLE_BRIDGE_logDebug("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
732  connection_info = new ConnectionInfo();
733  connection_info->id = id;
734  connection_info->topic = topic;
735 
736  connections_[id] = connection_info;
737  topic_connection_ids_[topic] = id;
738  }
739  else
740  connection_info = connections_[topic_conn_id_iter->second];
741 
742  connection_info->msg_def = message_definition;
743  connection_info->datatype = datatype;
744  connection_info->md5sum = md5sum;
745  connection_info->header = boost::make_shared<ros::M_string>();
746  (*connection_info->header)["type"] = connection_info->datatype;
747  (*connection_info->header)["md5sum"] = connection_info->md5sum;
748  (*connection_info->header)["message_definition"] = connection_info->msg_def;
749 
750  CONSOLE_BRIDGE_logDebug("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
751 }
752 
753 void Bag::decompressChunk(uint64_t chunk_pos) const {
754  if (curr_chunk_info_.pos == chunk_pos) {
756  return;
757  }
758 
760 
761  if (decompressed_chunk_ == chunk_pos)
762  return;
763 
764  // Seek to the start of the chunk
765  seek(chunk_pos);
766 
767  // Read the chunk header
768  ChunkHeader chunk_header;
769  readChunkHeader(chunk_header);
770 
771  // Read and decompress the chunk. These assume we are at the right place in the stream already
772  if (chunk_header.compression == COMPRESSION_NONE)
773  decompressRawChunk(chunk_header);
774  else if (chunk_header.compression == COMPRESSION_BZ2)
775  decompressBz2Chunk(chunk_header);
776  else if (chunk_header.compression == COMPRESSION_LZ4)
777  decompressLz4Chunk(chunk_header);
778  else
779  throw BagFormatException("Unknown compression: " + chunk_header.compression);
780 
781  decompressed_chunk_ = chunk_pos;
782 }
783 
784 void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const {
785  CONSOLE_BRIDGE_logDebug("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
786 
787  seek(offset);
788 
789  uint32_t data_size;
790  uint8_t op;
791  do {
792  if (!readHeader(header) || !readDataLength(data_size))
793  throw BagFormatException("Error reading header");
794 
795  readField(*header.getValues(), OP_FIELD_NAME, true, &op);
796  }
797  while (op == OP_MSG_DEF);
798 
799  if (op != OP_MSG_DATA)
800  throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str());
801 
802  record_buffer_.setSize(data_size);
803  file_.read((char*) record_buffer_.getData(), data_size);
804 }
805 
806 // Reading this into a buffer isn't completely necessary, but we do it anyways for now
807 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
808  assert(chunk_header.compression == COMPRESSION_NONE);
809  assert(chunk_header.compressed_size == chunk_header.uncompressed_size);
810 
811  CONSOLE_BRIDGE_logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
812 
814  file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size);
815 
816  // todo check read was successful
817 }
818 
819 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
820  assert(chunk_header.compression == COMPRESSION_BZ2);
821 
822  CompressionType compression = compression::BZ2;
823 
824  CONSOLE_BRIDGE_logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
825 
826  chunk_buffer_.setSize(chunk_header.compressed_size);
827  file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
828 
831 
832  // todo check read was successful
833 }
834 
835 void Bag::decompressLz4Chunk(ChunkHeader const& chunk_header) const {
836  assert(chunk_header.compression == COMPRESSION_LZ4);
837 
838  CompressionType compression = compression::LZ4;
839 
840  CONSOLE_BRIDGE_logDebug("lz4 compressed_size: %d uncompressed_size: %d",
841  chunk_header.compressed_size, chunk_header.uncompressed_size);
842 
843  chunk_buffer_.setSize(chunk_header.compressed_size);
844  file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
845 
848 
849  // todo check read was successful
850 }
851 
854  uint32_t data_size;
855  uint32_t bytes_read;
856  switch (version_)
857  {
858  case 200:
859  decompressChunk(index_entry.chunk_pos);
860  readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
861  return header;
862  case 102:
863  readMessageDataRecord102(index_entry.chunk_pos, header);
864  return header;
865  default:
866  throw BagFormatException((format("Unhandled version: %1%") % version_).str());
867  }
868 }
869 
870 // NOTE: this loads the header, which is unnecessary
871 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
873  uint32_t data_size;
874  uint32_t bytes_read;
875  switch (version_)
876  {
877  case 200:
878  decompressChunk(index_entry.chunk_pos);
879  readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
880  return data_size;
881  case 102:
882  readMessageDataRecord102(index_entry.chunk_pos, header);
883  return record_buffer_.getSize();
884  default:
885  throw BagFormatException((format("Unhandled version: %1%") % version_).str());
886  }
887 }
888 
890  foreach(ChunkInfo const& chunk_info, chunks_) {
891  // Write the chunk info header
893  uint32_t chunk_connection_count = chunk_info.connection_counts.size();
896  header[CHUNK_POS_FIELD_NAME] = toHeaderString(&chunk_info.pos);
897  header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
898  header[END_TIME_FIELD_NAME] = toHeaderString(&chunk_info.end_time);
899  header[COUNT_FIELD_NAME] = toHeaderString(&chunk_connection_count);
900 
901  CONSOLE_BRIDGE_logDebug("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
902  (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
903  chunk_info.start_time.sec, chunk_info.start_time.nsec,
904  chunk_info.end_time.sec, chunk_info.end_time.nsec);
905 
906  writeHeader(header);
907 
908  writeDataLength(8 * chunk_connection_count);
909 
910  // Write the topic names and counts
911  for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
912  uint32_t connection_id = i->first;
913  uint32_t count = i->second;
914 
915  write((char*) &connection_id, 4);
916  write((char*) &count, 4);
917 
918  CONSOLE_BRIDGE_logDebug(" - %d: %d", connection_id, count);
919  }
920  }
921 }
922 
924  // Read a CHUNK_INFO header
926  uint32_t data_size;
927  if (!readHeader(header) || !readDataLength(data_size))
928  throw BagFormatException("Error reading CHUNK_INFO record header");
929  M_string& fields = *header.getValues();
930  if (!isOp(fields, OP_CHUNK_INFO))
931  throw BagFormatException("Expected CHUNK_INFO op not found");
932 
933  // Check that the chunk info version is current
934  uint32_t chunk_info_version;
935  readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
936  if (chunk_info_version != CHUNK_INFO_VERSION)
937  throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str());
938 
939  // Read the chunk position, timestamp, and topic count fields
940  ChunkInfo chunk_info;
941  readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos);
942  readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time);
943  readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time);
944  uint32_t chunk_connection_count = 0;
945  readField(fields, COUNT_FIELD_NAME, true, &chunk_connection_count);
946 
947  CONSOLE_BRIDGE_logDebug("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
948  (unsigned long long) chunk_info.pos, chunk_connection_count,
949  chunk_info.start_time.sec, chunk_info.start_time.nsec,
950  chunk_info.end_time.sec, chunk_info.end_time.nsec);
951 
952  // Read the topic count entries
953  for (uint32_t i = 0; i < chunk_connection_count; i ++) {
954  uint32_t connection_id, connection_count;
955  read((char*) &connection_id, 4);
956  read((char*) &connection_count, 4);
957 
958  CONSOLE_BRIDGE_logDebug(" %d: %d messages", connection_id, connection_count);
959 
960  chunk_info.connection_counts[connection_id] = connection_count;
961  }
962 
963  chunks_.push_back(chunk_info);
964 }
965 
966 // Record I/O
967 
968 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
969  uint8_t op = 0xFF; // nonexistent op
970  readField(fields, OP_FIELD_NAME, true, &op);
971  return op == reqOp;
972 }
973 
974 void Bag::writeHeader(M_string const& fields) {
975  boost::shared_array<uint8_t> header_buffer;
976  uint32_t header_len;
977  ros::Header::write(fields, header_buffer, header_len);
978  write((char*) &header_len, 4);
979  write((char*) header_buffer.get(), header_len);
980 }
981 
982 void Bag::writeDataLength(uint32_t data_len) {
983  write((char*) &data_len, 4);
984 }
985 
986 void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) {
987  boost::shared_array<uint8_t> header_buffer;
988  uint32_t header_len;
989  ros::Header::write(fields, header_buffer, header_len);
990 
991  uint32_t offset = buf.getSize();
992 
993  buf.setSize(buf.getSize() + 4 + header_len);
994 
995  memcpy(buf.getData() + offset, &header_len, 4);
996  offset += 4;
997  memcpy(buf.getData() + offset, header_buffer.get(), header_len);
998 }
999 
1000 void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) {
1001  uint32_t offset = buf.getSize();
1002 
1003  buf.setSize(buf.getSize() + 4);
1004 
1005  memcpy(buf.getData() + offset, &data_len, 4);
1006 }
1007 
1009 void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const {
1010  assert(buffer.getSize() > 8);
1011 
1012  uint8_t* start = (uint8_t*) buffer.getData() + offset;
1013 
1014  uint8_t* ptr = start;
1015 
1016  // Read the header length
1017  uint32_t header_len;
1018  memcpy(&header_len, ptr, 4);
1019  ptr += 4;
1020 
1021  // Parse the header
1022  string error_msg;
1023  bool parsed = header.parse(ptr, header_len, error_msg);
1024  if (!parsed)
1025  throw BagFormatException("Error parsing header");
1026  ptr += header_len;
1027 
1028  // Read the data size
1029  memcpy(&data_size, ptr, 4);
1030  ptr += 4;
1031 
1032  bytes_read = ptr - start;
1033 }
1034 
1035 void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) const {
1036  (void)buffer;
1037  total_bytes_read = 0;
1038  uint8_t op = 0xFF;
1039  do {
1040  CONSOLE_BRIDGE_logDebug("reading header from buffer: offset=%d", offset);
1041  uint32_t bytes_read;
1042  readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
1043 
1044  offset += bytes_read;
1045  total_bytes_read += bytes_read;
1046 
1047  readField(*header.getValues(), OP_FIELD_NAME, true, &op);
1048  }
1049  while (op == OP_MSG_DEF || op == OP_CONNECTION);
1050 
1051  if (op != OP_MSG_DATA)
1052  throw BagFormatException("Expected MSG_DATA op not found");
1053 }
1054 
1055 bool Bag::readHeader(ros::Header& header) const {
1056  // Read the header length
1057  uint32_t header_len;
1058  read((char*) &header_len, 4);
1059 
1060  // Read the header
1061  header_buffer_.setSize(header_len);
1062  read((char*) header_buffer_.getData(), header_len);
1063 
1064  // Parse the header
1065  string error_msg;
1066  bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
1067  if (!parsed)
1068  return false;
1069 
1070  return true;
1071 }
1072 
1073 bool Bag::readDataLength(uint32_t& data_size) const {
1074  read((char*) &data_size, 4);
1075  return true;
1076 }
1077 
1078 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
1079  M_string::const_iterator fitr = fields.find(field);
1080  if (fitr == fields.end()) {
1081  if (required)
1082  throw BagFormatException("Required '" + field + "' field missing");
1083  }
1084  else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
1085  throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
1086 
1087  return fitr;
1088 }
1089 
1090 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
1091  return readField(fields, field_name, 1, UINT_MAX, required, data);
1092 }
1093 
1094 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
1095  M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
1096  if (fitr == fields.end())
1097  return false;
1098 
1099  data = fitr->second;
1100  return true;
1101 }
1102 
1103 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
1104  uint64_t packed_time;
1105  if (!readField(fields, field_name, required, &packed_time))
1106  return false;
1107 
1108  uint64_t bitmask = (1LL << 33) - 1;
1109  data.sec = (uint32_t) (packed_time & bitmask);
1110  data.nsec = (uint32_t) (packed_time >> 32);
1111 
1112  return true;
1113 }
1114 
1115 std::string Bag::toHeaderString(Time const* field) const {
1116  uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
1117  return toHeaderString(&packed_time);
1118 }
1119 
1120 
1121 // Low-level I/O
1122 
1123 void Bag::write(string const& s) { write(s.c_str(), s.length()); }
1124 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n); }
1125 
1126 void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); }
1127 void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); }
1128 
1129 void Bag::swap(Bag& other) {
1130  using std::swap;
1131  swap(mode_, other.mode_);
1132  swap(file_, other.file_);
1133  swap(version_, other.version_);
1134  swap(compression_, other.compression_);
1137  swap(file_size_, other.file_size_);
1141  swap(chunk_count_, other.chunk_count_);
1142  swap(chunk_open_, other.chunk_open_);
1147  swap(connections_, other.connections_);
1148  swap(chunks_, other.chunks_);
1158 }
1159 
1160 bool Bag::isOpen() const { return file_.isOpen(); }
1161 
1162 } // namespace rosbag
1163 
BagMode getMode() const
Get the mode the bag is in.
Definition: bag.cpp:192
uint32_t readMessageDataSize(IndexEntry const &index_entry) const
Definition: bag.cpp:871
static const std::string COMPRESSION_BZ2
Definition: constants.h:96
void openRead(std::string const &filename)
open file for reading
static const std::string END_TIME_FIELD_NAME
Definition: constants.h:62
void startWritingChunk(ros::Time time)
Definition: bag.cpp:416
static const std::string CHUNK_COUNT_FIELD_NAME
Definition: constants.h:56
BagMode mode_
Definition: bag.h:302
void read(void *ptr, size_t size)
read size bytes from the file into ptr
static const std::string COMPRESSION_FIELD_NAME
Definition: constants.h:58
std::map< std::string, uint32_t > topic_connection_ids_
Definition: bag.h:320
Bag & operator=(Bag &&other)
Definition: bag.cpp:78
void decompressChunk(uint64_t chunk_pos) const
Definition: bag.cpp:753
uint64_t pos
latest timestamp of a message in the chunk
Definition: structures.h:64
void openRead(std::string const &filename)
Definition: bag.cpp:125
static const std::string VER_FIELD_NAME
Definition: constants.h:52
ros::M_string::const_iterator checkField(ros::M_string const &fields, std::string const &field, unsigned int min_len, unsigned int max_len, bool required) const
Definition: bag.cpp:1078
static const std::string COMPRESSION_LZ4
Definition: constants.h:97
bool isOpen() const
Definition: bag.cpp:1160
void open(std::string const &filename, uint32_t mode=bagmode::Read)
Open a bag file.
Definition: bag.cpp:106
uint32_t getChunkThreshold() const
Get the threshold for creating new chunks.
Definition: bag.cpp:195
std::string getFileName() const
Get the filename of the bag.
Definition: bag.cpp:191
std::map< uint32_t, ConnectionInfo * > connections_
Definition: bag.h:322
std::string compression
Definition: structures.h:71
int version_
Definition: bag.h:304
void swap(Bag &)
Definition: bag.cpp:1129
static const std::string CONNECTION_FIELD_NAME
Definition: constants.h:57
void startReadingVersion102()
Definition: bag.cpp:311
void write(std::string const &s)
uint8_t * getData()
Definition: buffer.cpp:51
uint64_t file_size_
Definition: bag.h:309
std::map< ros::M_string, uint32_t > header_connection_ids_
Definition: bag.h:321
static const std::string INDEX_POS_FIELD_NAME
Definition: constants.h:54
void setChunkThreshold(uint32_t chunk_threshold)
Set the threshold for creating new chunks.
Definition: bag.cpp:197
ros::Time time
Definition: structures.h:78
void readTopicIndexRecord102()
Definition: bag.cpp:531
uint64_t getOffset() const
return current offset from the beginning of the file
std::map< uint32_t, uint32_t > connection_counts
absolute byte offset of chunk record in bag file
Definition: structures.h:66
void readMessageDataHeaderFromBuffer(Buffer &buffer, uint32_t offset, ros::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
Definition: bag.cpp:1035
CompressionType getCompression() const
Get the compression method to use for writing chunks.
Definition: bag.cpp:204
Buffer header_buffer_
reusable buffer in which to assemble the record header before writing to file
Definition: bag.h:329
static const unsigned char OP_CONNECTION
Definition: constants.h:80
void openWrite(std::string const &filename)
Definition: bag.cpp:138
void seek(uint64_t pos, int origin=std::ios_base::beg) const
Definition: bag.cpp:1127
Base class for rosbag exceptions.
Definition: exceptions.h:43
bool truncate(uint64_t length)
void seek(uint64_t offset, int origin=std::ios_base::beg)
seek to given offset from origin
void writeConnectionRecords()
Definition: bag.cpp:643
uint32_t getChunkOffset() const
Definition: bag.cpp:409
static const std::string DEF_FIELD_NAME
Definition: constants.h:68
uint32_t getMajorVersion() const
Get the major-version of the open bag file.
Definition: bag.cpp:251
void readChunkHeader(ChunkHeader &chunk_header) const
Definition: bag.cpp:484
void writeConnectionRecord(ConnectionInfo const *connection_info)
Definition: bag.cpp:650
std_msgs::Header * header(M &m)
void openAppend(std::string const &filename)
Definition: bag.cpp:144
void close()
close the file
void swap(Bag &a, Bag &b)
Definition: bag.h:653
void appendDataLengthToBuffer(Buffer &buf, uint32_t data_len)
Definition: bag.cpp:1000
Exception thrown on problems reading the bag format.
Definition: exceptions.h:57
uint32_t getCompressedBytesIn() const
return the number of bytes written to current compressed stream
BagMode
The possible modes to open a bag in.
Definition: bag.h:83
std::string md5sum
Definition: structures.h:54
void setWriteMode(CompressionType type)
static const unsigned char OP_MSG_DATA
Definition: constants.h:75
void close()
Close the bag file.
Definition: bag.cpp:166
static const unsigned char OP_CHUNK
Definition: constants.h:78
bool readField(ros::M_string const &fields, std::string const &field_name, bool required, T *data) const
Definition: bag.h:376
static const std::string VERSION
Definition: constants.h:44
Buffer decompress_buffer_
reusable buffer to decompress chunks into
Definition: bag.h:333
void appendConnectionRecordToBuffer(Buffer &buf, ConnectionInfo const *connection_info)
Definition: bag.cpp:663
uint32_t compressed_size
chunk compression type, e.g. "none" or "bz2" (see constants.h)
Definition: structures.h:72
void decompressRawChunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:807
std::map< std::string, std::string > M_string
void readMessageDefinitionRecord102()
Definition: bag.cpp:709
void writeFileHeaderRecord()
Definition: bag.cpp:347
void readVersion()
Definition: bag.cpp:232
std::string toHeaderString(T const *field) const
Definition: bag.h:371
static const std::string CHUNK_POS_FIELD_NAME
Definition: constants.h:63
uint64_t decompressed_chunk_
position of decompressed chunk
Definition: bag.h:339
bool readHeader(ros::Header &header) const
Definition: bag.cpp:1055
void readHeaderFromBuffer(Buffer &buffer, uint32_t offset, ros::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
Definition: bag.cpp:1009
const char * datatype()
uint64_t index_data_pos_
Definition: bag.h:311
~Bag()
Definition: bag.cpp:85
void writeHeader(ros::M_string const &fields)
Definition: bag.cpp:974
static const std::string START_TIME_FIELD_NAME
Definition: constants.h:61
Exception thrown on problems reading the bag index.
Definition: exceptions.h:64
static const uint32_t CHUNK_INFO_VERSION
Definition: constants.h:92
void decompressBz2Chunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:819
Buffer * current_buffer_
Definition: bag.h:337
Exception thrown when on IO problems.
Definition: exceptions.h:50
bool parse(const boost::shared_array< uint8_t > &buffer, uint32_t size, std::string &error_msg)
void appendHeaderToBuffer(Buffer &buf, ros::M_string const &fields)
Definition: bag.cpp:986
bool isOp(ros::M_string &fields, uint8_t reqOp) const
Definition: bag.cpp:968
void setCompression(CompressionType compression)
Set the compression method to use for writing chunks.
Definition: bag.cpp:206
void decompressLz4Chunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:835
static void write(const M_string &key_vals, boost::shared_array< uint8_t > &buffer, uint32_t &size)
std::string msg_def
Definition: structures.h:55
void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size)
Definition: bag.cpp:461
static const std::string OP_FIELD_NAME
Definition: constants.h:50
std::string getline()
uint64_t file_header_pos_
Definition: bag.h:310
bool isOpen() const
return true if file is open for reading or writing
uint32_t bag_revision_
Definition: bag.h:307
Buffer record_buffer_
reusable buffer in which to assemble the record data before writing to file
Definition: bag.h:330
void writeVersion()
Definition: bag.cpp:222
void stopWritingChunk()
Definition: bag.cpp:434
static const std::string TYPE_FIELD_NAME
Definition: constants.h:67
ROSTIME_DECL const Time TIME_MAX
uint32_t getSize() const
Definition: buffer.cpp:53
Buffer chunk_buffer_
reusable buffer to read chunk into
Definition: bag.h:332
ChunkInfo curr_chunk_info_
Definition: bag.h:317
uint32_t getMinorVersion() const
Get the minor-version of the open bag file.
Definition: bag.cpp:252
static const std::string COUNT_FIELD_NAME
Definition: constants.h:53
std::map< uint32_t, std::multiset< IndexEntry > > connection_indexes_
Definition: bag.h:326
uint64_t chunk_pos
timestamp of the message
Definition: structures.h:79
static const unsigned char OP_FILE_HEADER
Definition: constants.h:76
static const unsigned char OP_INDEX_DATA
Definition: constants.h:77
uint32_t connection_count_
Definition: bag.h:312
void readConnectionIndexRecord200()
Definition: bag.cpp:593
void readChunkInfoRecord()
Definition: bag.cpp:923
void openWrite(std::string const &filename)
open file for writing
void stopWriting()
Definition: bag.cpp:262
M_stringPtr getValues()
uint32_t offset
absolute byte offset of the chunk record containing the message
Definition: structures.h:80
void init()
Definition: bag.cpp:89
bool readDataLength(uint32_t &data_size) const
Definition: bag.cpp:1073
uint32_t chunk_threshold_
Definition: bag.h:306
std::vector< ChunkInfo > chunks_
Definition: bag.h:324
std::string datatype
Definition: structures.h:53
ROSTIME_DECL const Time TIME_MIN
boost::shared_ptr< ros::M_string > header
Definition: structures.h:57
void startReadingVersion200()
Definition: bag.cpp:276
uint32_t chunk_count_
Definition: bag.h:313
bagmode::BagMode BagMode
Definition: bag.h:90
void closeWrite()
Definition: bag.cpp:187
Definition: bag.h:78
void writeDataLength(uint32_t data_len)
Definition: bag.cpp:982
CompressionType compression_
Definition: bag.h:305
void decompress(CompressionType compression, uint8_t *dest, unsigned int dest_len, uint8_t *source, unsigned int source_len)
std::map< uint32_t, std::multiset< IndexEntry > > curr_chunk_connection_indexes_
Definition: bag.h:327
void read(char *b, std::streamsize n) const
Definition: bag.cpp:1126
uint32_t uncompressed_size
compressed size of the chunk in bytes
Definition: structures.h:73
static const std::string CONNECTION_COUNT_FIELD_NAME
Definition: constants.h:55
static const uint32_t FILE_HEADER_LENGTH
Definition: constants.h:86
uint64_t curr_chunk_data_pos_
Definition: bag.h:318
#define CONSOLE_BRIDGE_logError(fmt,...)
static const std::string MD5_FIELD_NAME
Definition: constants.h:66
Buffer outgoing_chunk_buffer_
reusable buffer to read chunk into
Definition: bag.h:335
static const unsigned char OP_CHUNK_INFO
Definition: constants.h:79
void readMessageDataRecord102(uint64_t offset, ros::Header &header) const
Definition: bag.cpp:784
const char * md5sum()
static const std::string SIZE_FIELD_NAME
Definition: constants.h:59
void readConnectionRecord()
Definition: bag.cpp:673
static const std::string COMPRESSION_NONE
Definition: constants.h:95
ros::Time end_time
earliest timestamp of a message in the chunk
Definition: structures.h:63
uint64_t getSize() const
Get the current size of the bag file (a lower bound)
Definition: bag.cpp:193
void writeIndexRecords()
Definition: bag.cpp:502
void startWriting()
Definition: bag.cpp:256
void writeChunkInfoRecords()
Definition: bag.cpp:889
std::string getFileName() const
return path of currently open file
ChunkedFile file_
Definition: bag.h:303
static const std::string TOPIC_FIELD_NAME
Definition: constants.h:51
void openReadWrite(std::string const &filename)
open file for reading & writing
static const unsigned char OP_MSG_DEF
Definition: constants.h:83
void setSize(uint32_t size)
Definition: buffer.cpp:55
void write(std::string const &topic, ros::MessageEvent< T > const &event)
Write a message into the bag file.
Definition: bag.h:351
void readFileHeaderRecord()
Definition: bag.cpp:379
static const uint32_t INDEX_VERSION
Definition: constants.h:89
ros::Header readMessageDataHeader(IndexEntry const &index_entry)
Definition: bag.cpp:852
bool chunk_open_
Definition: bag.h:316
ros::Time start_time
Definition: structures.h:62


rosbag_storage
Author(s):
autogenerated on Sun Feb 3 2019 03:29:47