bag.cpp
Go to the documentation of this file.
1 // Copyright (c) 2009, Willow Garage, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 //
7 // * Redistributions of source code must retain the above copyright
8 // notice, this list of conditions and the following disclaimer.
9 // * Redistributions in binary form must reproduce the above copyright
10 // notice, this list of conditions and the following disclaimer in the
11 // documentation and/or other materials provided with the distribution.
12 // * Neither the name of Willow Garage, Inc. nor the names of its
13 // contributors may be used to endorse or promote products derived from
14 // this software without specific prior written permission.
15 //
16 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 // POSSIBILITY OF SUCH DAMAGE.
27 
28 #include "rosbag/bag.h"
30 #include "rosbag/query.h"
31 #include "rosbag/view.h"
32 
33 #if defined(_MSC_VER)
34  #include <stdint.h> // only on v2010 and later -> is this enough for msvc and linux?
35 #else
36  #include <inttypes.h>
37 #endif
38 #include <signal.h>
39 #include <assert.h>
40 #include <iomanip>
41 #include <map>
42 #include <tuple>
43 #include <boost/foreach.hpp>
44 
45 #include "console_bridge/console.h"
46 #include <memory.h>
47 
48 #define foreach BOOST_FOREACH
49 
50 using std::map;
51 using std::priority_queue;
52 using std::string;
53 using std::vector;
54 using std::multiset;
55 using boost::format;
56 using std::shared_ptr;
59 
60 namespace rosbag {
61 
63  mode_(bagmode::Write),
64  version_(0),
65  compression_(compression::Uncompressed),
66  chunk_threshold_(768 * 1024), // 768KB chunks
67  bag_revision_(0),
68  file_size_(0),
69  file_header_pos_(0),
70  index_data_pos_(0),
71  connection_count_(0),
72  chunk_count_(0),
73  chunk_open_(false),
74  curr_chunk_data_pos_(0),
75  current_buffer_(0),
76  decompressed_chunk_(0)
77 {
78 }
79 
80 Bag::Bag(string const& filename, uint32_t mode) :
81  compression_(compression::Uncompressed),
82  chunk_threshold_(768 * 1024), // 768KB chunks
83  bag_revision_(0),
84  file_size_(0),
86  index_data_pos_(0),
88  chunk_count_(0),
89  chunk_open_(false),
91  current_buffer_(0),
93 {
94  open(filename, mode);
95 }
96 
98  close();
99 }
100 
101 void Bag::open(string const& filename, uint32_t mode) {
102  mode_ = (BagMode) mode;
103 
104  if (mode_ & bagmode::Append)
105  openAppend(filename);
106  else if (mode_ & bagmode::Write)
107  openWrite(filename);
108  else if (mode_ & bagmode::Read)
109  openRead(filename);
110  else
111  throw BagException((format("Unknown mode: %1%") % (int) mode).str());
112 
113  // Determine file size
115  seek(0, std::ios::end);
117  seek(offset);
118 }
119 
120 void Bag::openRead(string const& filename) {
121  file_.openRead(filename);
122 
123  readVersion();
124 
125  switch (version_) {
126  case 102: startReadingVersion102(); break;
127  case 200: startReadingVersion200(); break;
128  default:
129  throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str());
130  }
131 }
132 
133 void Bag::openWrite(string const& filename) {
134  file_.openWrite(filename);
135 
136  startWriting();
137 }
138 
139 void Bag::openAppend(string const& filename) {
140  file_.openReadWrite(filename);
141 
142  readVersion();
143 
144  if (version_ != 200)
145  throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str());
146 
148 
149  // Truncate the file to chop off the index
151  index_data_pos_ = 0;
152 
153  // Rewrite the file header, clearing the index position (so we know if the index is invalid)
156 
157  // Seek to the end of the file
158  seek(0, std::ios::end);
159 }
160 
161 void Bag::close() {
162  if (!file_.isOpen())
163  return;
164 
166  closeWrite();
167 
168  file_.close();
169 
170  topic_connection_ids_.clear();
171  header_connection_ids_.clear();
172  for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
173  delete i->second;
174  connections_.clear();
175  chunks_.clear();
176  connection_indexes_.clear();
178 }
179 
181  stopWriting();
182 }
183 
184 string Bag::getFileName() const { return file_.getFileName(); }
185 BagMode Bag::getMode() const { return mode_; }
186 uint64_t Bag::getSize() const { return file_size_; }
187 
189 
190 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
191  if (file_.isOpen() && chunk_open_)
193 
194  chunk_threshold_ = chunk_threshold;
195 }
196 
198 
199 std::tuple<std::string, uint64_t, uint64_t> Bag::getCompressionInfo() const
200 {
201  std::map<std::string, uint64_t> compression_counts;
202  std::map<std::string, uint64_t> compression_uncompressed;
203  std::map<std::string, uint64_t> compression_compressed;
204  auto compression = compression_;
205  uint64_t uncompressed = 0;
206  uint64_t compressed = 0;
207  ChunkInfo curr_chunk_info;
208  foreach(ChunkInfo const& chunk_info, chunks_) {
209  curr_chunk_info = chunk_info;
210 
211  seek(curr_chunk_info.pos);
212 
213  // Skip over the chunk data
214  ChunkHeader chunk_header;
215  readChunkHeader(chunk_header);
216  seek(chunk_header.compressed_size, std::ios::cur);
217 
218  compression_counts[chunk_header.compression] += 1;
219  compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size;
220  uncompressed += chunk_header.uncompressed_size;
221  compression_compressed[chunk_header.compression] += chunk_header.compressed_size;
222  compressed += chunk_header.compressed_size;
223  }
224 
225  auto chunk_count = chunks_.size();
226  uint64_t main_compression_count = 0;
227  std::string main_compression;
228  for (auto&& kvp : compression_counts)
229  {
230  if (kvp.second > main_compression_count)
231  {
232  main_compression = kvp.first;
233  main_compression_count = kvp.second;
234  }
235  }
236  //auto main_compression_str = compression_counts.begin()->first;
237  //CompressionType main_compression = (main_compression_str == "Uncompressed") ? CompressionType::Uncompressed : (main_compression_str == "LZ4") ? CompressionType::LZ4 : CompressionType::BZ2;
238  //auto main_compression_count = compression_counts.begin()->second;
239  return std::make_tuple(main_compression, compressed, uncompressed);
240 }
242  if (file_.isOpen() && chunk_open_)
244 
245  if (!(compression == compression::Uncompressed ||
246  compression == compression::BZ2 ||
247  compression == compression::LZ4)) {
248  throw BagException(
249  (format("Unknown compression type: %i") % compression).str());
250  }
251 
252  compression_ = compression;
253 }
254 
255 // Version
256 
258  string version = string("#ROSBAG V") + VERSION + string("\n");
259 
260  CONSOLE_BRIDGE_logDebug("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
261 
262  version_ = 200;
263 
264  write(version);
265 }
266 
268  string version_line = file_.getline();
269 
271 
272  char logtypename[100];
273  int version_major, version_minor;
274 #if defined(_MSC_VER)
275  if (sscanf_s(version_line.c_str(), "#ROS%s V%d.%d", logtypename, static_cast<unsigned>(sizeof(logtypename)), &version_major, &version_minor) != 3)
276 #else
277  if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3)
278 #endif
279  throw BagIOException("Error reading version line");
280 
281  version_ = version_major * 100 + version_minor;
282 
283  CONSOLE_BRIDGE_logDebug("Read VERSION: version=%d", version_);
284 }
285 
286 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
287 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
288 
289 //
290 
292  writeVersion();
295 }
296 
298  if (chunk_open_)
300 
301  seek(0, std::ios::end);
302 
306 
309 }
310 
312  // Read the file header record, which points to the end of the chunks
314 
315  // Seek to the end of the chunks
317 
318  // Read the connection records (one for each connection)
319  for (uint32_t i = 0; i < connection_count_; i++)
321 
322  // Read the chunk info records
323  for (uint32_t i = 0; i < chunk_count_; i++)
325 
326  // Read the connection indexes for each chunk
327  foreach(ChunkInfo const& chunk_info, chunks_) {
328  curr_chunk_info_ = chunk_info;
329 
331 
332  // Skip over the chunk data
333  ChunkHeader chunk_header;
334  readChunkHeader(chunk_header);
335  seek(chunk_header.compressed_size, std::ios::cur);
336 
337  // Read the index records after the chunk
338  for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
340  }
341 
342  // At this point we don't have a curr_chunk_info anymore so we reset it
344 }
345 
347  try
348  {
349  // Read the file header record, which points to the start of the topic indexes
351  }
352  catch (BagFormatException ex) {
353  throw BagUnindexedException();
354  }
355 
356  // Get the length of the file
357  seek(0, std::ios::end);
358  uint64_t filelength = file_.getOffset();
359 
360  // Seek to the beginning of the topic index records
362 
363  // Read the topic index records, which point to the offsets of each message in the file
364  while (file_.getOffset() < filelength)
366 
367  // Read the message definition records (which are the first entry in the topic indexes)
368  for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
369  multiset<IndexEntry> const& index = i->second;
370  IndexEntry const& first_entry = *index.begin();
371 
372  CONSOLE_BRIDGE_logDebug("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
373 
374  seek(first_entry.chunk_pos);
375 
377  }
378 }
379 
380 // File header record
381 
383  connection_count_ = static_cast<uint32_t>(connections_.size());
384  chunk_count_ = static_cast<uint32_t>(chunks_.size());
385 
386  CONSOLE_BRIDGE_logDebug("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
387  (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
388 
389  // Write file header record
395 
396  std::vector<uint8_t> header_buffer;
397  uint32_t header_len;
398  rs2rosinternal::Header::write(header, header_buffer, header_len);
399  uint32_t data_len = 0;
400  if (header_len < FILE_HEADER_LENGTH)
401  data_len = FILE_HEADER_LENGTH - header_len;
402  write((char*) &header_len, 4);
403  write((char*) header_buffer.data(), header_len);
404  write((char*) &data_len, 4);
405 
406  // Pad the file header record out
407  if (data_len > 0) {
408  string padding;
409  padding.resize(data_len, ' ');
410  write(padding);
411  }
412 }
413 
416  uint32_t data_size;
417  if (!readHeader(header) || !readDataLength(data_size))
418  throw BagFormatException("Error reading FILE_HEADER record");
419 
420  M_string& fields = *header.getValues();
421 
422  if (!isOp(fields, OP_FILE_HEADER))
423  throw BagFormatException("Expected FILE_HEADER op not found");
424 
425  // Read index position
427 
428  if (index_data_pos_ == 0)
429  throw BagUnindexedException();
430 
431  // Read topic and chunks count
432  if (version_ >= 200) {
435  }
436 
437  CONSOLE_BRIDGE_logDebug("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
438  (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
439 
440  // Skip the data section (just padding)
441  seek(data_size, std::ios::cur);
442 }
443 
446  return static_cast<uint32_t>(file_.getOffset() - curr_chunk_data_pos_);
447  else
448  return file_.getCompressedBytesIn();
449 }
450 
452  // Initialize chunk info
455  curr_chunk_info_.end_time = time;
456 
457  // Write the chunk header, with a place-holder for the data sizes (we'll fill in when the chunk is finished)
459 
460  // Turn on compressed writing
462 
463  // Record where the data section of this chunk started
465 
466  chunk_open_ = true;
467 }
468 
470  // Add this chunk to the index
471  chunks_.push_back(curr_chunk_info_);
472 
473  // Get the uncompressed and compressed sizes
474  uint32_t uncompressed_size = getChunkOffset();
476  uint32_t compressed_size = static_cast<uint32_t>(file_.getOffset() - curr_chunk_data_pos_);
477 
478  // Rewrite the chunk header with the size of the chunk (remembering current offset)
479  uint64_t end_of_chunk_pos = file_.getOffset();
480 
482  writeChunkHeader(compression_, compressed_size, uncompressed_size);
483 
484  // Write out the indexes and clear them
485  seek(end_of_chunk_pos);
488 
489  // Clear the connection counts
491 
492  // Flag that we're starting a new chunk
493  chunk_open_ = false;
494 }
495 
496 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
497  ChunkHeader chunk_header;
498  switch (compression) {
499  case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
500  case compression::BZ2: chunk_header.compression = COMPRESSION_BZ2; break;
501  case compression::LZ4: chunk_header.compression = COMPRESSION_LZ4;
502  //case compression::ZLIB: chunk_header.compression = COMPRESSION_ZLIB; break;
503  }
504  chunk_header.compressed_size = compressed_size;
505  chunk_header.uncompressed_size = uncompressed_size;
506 
507  CONSOLE_BRIDGE_logDebug("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
508  (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
509 
512  header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
513  header[SIZE_FIELD_NAME] = toHeaderString(&chunk_header.uncompressed_size);
514  writeHeader(header);
515 
516  writeDataLength(chunk_header.compressed_size);
517 }
518 
519 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
521  if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
522  throw BagFormatException("Error reading CHUNK record");
523 
524  M_string& fields = *header.getValues();
525 
526  if (!isOp(fields, OP_CHUNK))
527  throw BagFormatException("Expected CHUNK op not found");
528 
529  readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
530  readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size);
531 
532  CONSOLE_BRIDGE_logDebug("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
533 }
534 
535 // Index records
536 
538  for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
539  uint32_t connection_id = i->first;
540  multiset<IndexEntry> const& index = i->second;
541 
542  // Write the index record header
543  uint32_t index_size = static_cast<uint32_t>(index.size());
546  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
548  header[COUNT_FIELD_NAME] = toHeaderString(&index_size);
549  writeHeader(header);
550 
551  writeDataLength(index_size * 12);
552 
553  CONSOLE_BRIDGE_logDebug("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
554 
555  // Write the index record data (pairs of timestamp and position in file)
556  foreach(IndexEntry const& e, index) {
557  write((char*) &e.time.sec, 4);
558  write((char*) &e.time.nsec, 4);
559  write((char*) &e.offset, 4);
560 
561  CONSOLE_BRIDGE_logDebug(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
562  }
563  }
564 }
565 
568  uint32_t data_size;
569  if (!readHeader(header) || !readDataLength(data_size))
570  throw BagFormatException("Error reading INDEX_DATA header");
571  M_string& fields = *header.getValues();
572 
573  if (!isOp(fields, OP_INDEX_DATA))
574  throw BagFormatException("Expected INDEX_DATA record");
575 
576  uint32_t index_version;
577  string topic;
578  uint32_t count = 0;
579  readField(fields, VER_FIELD_NAME, true, &index_version);
580  readField(fields, TOPIC_FIELD_NAME, true, topic);
581  readField(fields, COUNT_FIELD_NAME, true, &count);
582 
583  CONSOLE_BRIDGE_logDebug("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
584 
585  if (index_version != 0)
586  throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
587 
588  uint32_t connection_id;
589  map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
590  if (topic_conn_id_iter == topic_connection_ids_.end()) {
591  connection_id = static_cast<uint32_t>(connections_.size());
592 
593  CONSOLE_BRIDGE_logDebug("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
594  ConnectionInfo* connection_info = new ConnectionInfo();
595  connection_info->id = connection_id;
596  connection_info->topic = topic;
597  connections_[connection_id] = connection_info;
598 
599  topic_connection_ids_[topic] = connection_id;
600  }
601  else
602  connection_id = topic_conn_id_iter->second;
603 
604  multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
605 
606  for (uint32_t i = 0; i < count; i++) {
607  IndexEntry index_entry;
608  uint32_t sec;
609  uint32_t nsec;
610  read((char*) &sec, 4);
611  read((char*) &nsec, 4);
612  read((char*) &index_entry.chunk_pos, 8); //<! store position of the message in the chunk_pos field as it's 64 bits
613  index_entry.time = Time(sec, nsec);
614  index_entry.offset = 0;
615 
616  CONSOLE_BRIDGE_logDebug(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
617 
618  if (index_entry.time < rs2rosinternal::TIME_MIN || index_entry.time > rs2rosinternal::TIME_MAX)
619  {
620  CONSOLE_BRIDGE_logError("Index entry for topic %s contains invalid time.", topic.c_str());
621  } else
622  {
623  connection_index.insert(connection_index.end(), index_entry);
624  }
625  }
626 }
627 
630  uint32_t data_size;
631  if (!readHeader(header) || !readDataLength(data_size))
632  throw BagFormatException("Error reading INDEX_DATA header");
633  M_string& fields = *header.getValues();
634 
635  if (!isOp(fields, OP_INDEX_DATA))
636  throw BagFormatException("Expected INDEX_DATA record");
637 
638  uint32_t index_version;
639  uint32_t connection_id;
640  uint32_t count = 0;
641  readField(fields, VER_FIELD_NAME, true, &index_version);
642  readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
643  readField(fields, COUNT_FIELD_NAME, true, &count);
644 
645  CONSOLE_BRIDGE_logDebug("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
646 
647  if (index_version != 1)
648  throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
649 
650  uint64_t chunk_pos = curr_chunk_info_.pos;
651 
652  multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
653 
654  for (uint32_t i = 0; i < count; i++) {
655  IndexEntry index_entry;
656  index_entry.chunk_pos = chunk_pos;
657  uint32_t sec;
658  uint32_t nsec;
659  read((char*) &sec, 4);
660  read((char*) &nsec, 4);
661  read((char*) &index_entry.offset, 4);
662  index_entry.time = Time(sec, nsec);
663 
664  CONSOLE_BRIDGE_logDebug(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
665 
666  if (index_entry.time < rs2rosinternal::TIME_MIN || index_entry.time > rs2rosinternal::TIME_MAX)
667  {
668  CONSOLE_BRIDGE_logError("Index entry for topic %s contains invalid time. This message will not be loaded.", connections_[connection_id]->topic.c_str());
669  } else
670  {
671  connection_index.insert(connection_index.end(), index_entry);
672  }
673  }
674 }
675 
676 // Connection records
677 
679  for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
680  ConnectionInfo const* connection_info = i->second;
681  writeConnectionRecord(connection_info);
682  }
683 }
684 
685 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) {
686  CONSOLE_BRIDGE_logDebug("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
687  (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
688 
691  header[TOPIC_FIELD_NAME] = connection_info->topic;
692  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
693  writeHeader(header);
694 
695  writeHeader(*connection_info->header);
696 }
697 
701  header[TOPIC_FIELD_NAME] = connection_info->topic;
702  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
703  appendHeaderToBuffer(buf, header);
704 
705  appendHeaderToBuffer(buf, *connection_info->header);
706 }
707 
710  if (!readHeader(header))
711  throw BagFormatException("Error reading CONNECTION header");
712  M_string& fields = *header.getValues();
713 
714  if (!isOp(fields, OP_CONNECTION))
715  throw BagFormatException("Expected CONNECTION op not found");
716 
717  uint32_t id;
718  readField(fields, CONNECTION_FIELD_NAME, true, &id);
719  string topic;
720  readField(fields, TOPIC_FIELD_NAME, true, topic);
721 
722  rs2rosinternal::Header connection_header;
723  if (!readHeader(connection_header))
724  throw BagFormatException("Error reading connection header");
725 
726  // If this is a new connection, update connections
727  map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
728  if (key == connections_.end()) {
729  ConnectionInfo* connection_info = new ConnectionInfo();
730  connection_info->id = id;
731  connection_info->topic = topic;
732  connection_info->header = std::make_shared<M_string>();
733  for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
734  (*connection_info->header)[i->first] = i->second;
735  connection_info->msg_def = (*connection_info->header)["message_definition"];
736  connection_info->datatype = (*connection_info->header)["type"];
737  connection_info->md5sum = (*connection_info->header)["md5sum"];
738  connections_[id] = connection_info;
739 
740  CONSOLE_BRIDGE_logDebug("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
741  }
742 }
743 
746  uint32_t data_size;
747  if (!readHeader(header) || !readDataLength(data_size))
748  throw BagFormatException("Error reading message definition header");
749  M_string& fields = *header.getValues();
750 
751  if (!isOp(fields, OP_MSG_DEF))
752  throw BagFormatException("Expected MSG_DEF op not found");
753 
754  string topic, md5sum, datatype, message_definition;
755  readField(fields, TOPIC_FIELD_NAME, true, topic);
756  readField(fields, MD5_FIELD_NAME, 32, 32, true, md5sum);
757  readField(fields, TYPE_FIELD_NAME, true, datatype);
758  readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition);
759 
760  ConnectionInfo* connection_info;
761 
762  map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
763  if (topic_conn_id_iter == topic_connection_ids_.end()) {
764  uint32_t id = static_cast<uint32_t>(connections_.size());
765 
766  CONSOLE_BRIDGE_logDebug("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
767  connection_info = new ConnectionInfo();
768  connection_info->id = id;
769  connection_info->topic = topic;
770 
771  connections_[id] = connection_info;
772  topic_connection_ids_[topic] = id;
773  }
774  else
775  connection_info = connections_[topic_conn_id_iter->second];
776 
777  connection_info->msg_def = message_definition;
778  connection_info->datatype = datatype;
779  connection_info->md5sum = md5sum;
780  connection_info->header = std::make_shared<rs2rosinternal::M_string>();
781  (*connection_info->header)["type"] = connection_info->datatype;
782  (*connection_info->header)["md5sum"] = connection_info->md5sum;
783  (*connection_info->header)["message_definition"] = connection_info->msg_def;
784 
785  CONSOLE_BRIDGE_logDebug("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
786 }
787 
788 void Bag::decompressChunk(uint64_t chunk_pos) const {
789  if (curr_chunk_info_.pos == chunk_pos) {
791  return;
792  }
793 
795 
796  if (decompressed_chunk_ == chunk_pos)
797  return;
798 
799  // Seek to the start of the chunk
800  seek(chunk_pos);
801 
802  // Read the chunk header
803  ChunkHeader chunk_header;
804  readChunkHeader(chunk_header);
805 
806  // Read and decompress the chunk. These assume we are at the right place in the stream already
807  if (chunk_header.compression == COMPRESSION_NONE)
808  decompressRawChunk(chunk_header);
809  else if (chunk_header.compression == COMPRESSION_BZ2)
810  decompressBz2Chunk(chunk_header);
811  else if (chunk_header.compression == COMPRESSION_LZ4)
812  decompressLz4Chunk(chunk_header);
813  else
814  throw BagFormatException("Unknown compression: " + chunk_header.compression);
815 
816  decompressed_chunk_ = chunk_pos;
817 }
818 
820  CONSOLE_BRIDGE_logDebug("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
821 
822  seek(offset);
823 
824  uint32_t data_size;
825  uint8_t op;
826  do {
827  if (!readHeader(header) || !readDataLength(data_size))
828  throw BagFormatException("Error reading header");
829 
830  readField(*header.getValues(), OP_FIELD_NAME, true, &op);
831  }
832  while (op == OP_MSG_DEF);
833 
834  if (op != OP_MSG_DATA)
835  throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str());
836 
837  record_buffer_.setSize(data_size);
838  file_.read((char*) record_buffer_.getData(), data_size);
839 }
840 
841 // Reading this into a buffer isn't completely necessary, but we do it anyways for now
842 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
843  assert(chunk_header.compression == COMPRESSION_NONE);
844  assert(chunk_header.compressed_size == chunk_header.uncompressed_size);
845 
846  CONSOLE_BRIDGE_logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
847 
849  file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size);
850 
851  // todo check read was successful
852 }
853 
854 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
855  assert(chunk_header.compression == COMPRESSION_BZ2);
856 
857  CompressionType compression = compression::BZ2;
858 
859  CONSOLE_BRIDGE_logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
860 
861  chunk_buffer_.setSize(chunk_header.compressed_size);
862  file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
863 
866 
867  // todo check read was successful
868 }
869 
870 void Bag::decompressLz4Chunk(ChunkHeader const& chunk_header) const {
871  assert(chunk_header.compression == COMPRESSION_LZ4);
872 
873  CompressionType compression = compression::LZ4;
874 
875  CONSOLE_BRIDGE_logDebug("lz4 compressed_size: %d uncompressed_size: %d",
876  chunk_header.compressed_size, chunk_header.uncompressed_size);
877 
878  chunk_buffer_.setSize(chunk_header.compressed_size);
879  file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
880 
883 
884  // todo check read was successful
885 }
886 
889  uint32_t data_size;
890  uint32_t bytes_read;
891  switch (version_)
892  {
893  case 200:
894  decompressChunk(index_entry.chunk_pos);
895  readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
896  return header;
897  case 102:
898  readMessageDataRecord102(index_entry.chunk_pos, header);
899  return header;
900  default:
901  throw BagFormatException((format("Unhandled version: %1%") % version_).str());
902  }
903 }
904 
905 // NOTE: this loads the header, which is unnecessary
906 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
908  uint32_t data_size;
909  uint32_t bytes_read;
910  switch (version_)
911  {
912  case 200:
913  decompressChunk(index_entry.chunk_pos);
914  readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
915  return data_size;
916  case 102:
917  readMessageDataRecord102(index_entry.chunk_pos, header);
918  return record_buffer_.getSize();
919  default:
920  throw BagFormatException((format("Unhandled version: %1%") % version_).str());
921  }
922 }
923 
925  foreach(ChunkInfo const& chunk_info, chunks_) {
926  // Write the chunk info header
928  uint32_t chunk_connection_count = static_cast<uint32_t>(chunk_info.connection_counts.size());
931  header[CHUNK_POS_FIELD_NAME] = toHeaderString(&chunk_info.pos);
932  header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
933  header[END_TIME_FIELD_NAME] = toHeaderString(&chunk_info.end_time);
934  header[COUNT_FIELD_NAME] = toHeaderString(&chunk_connection_count);
935 
936  CONSOLE_BRIDGE_logDebug("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
937  (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
938  chunk_info.start_time.sec, chunk_info.start_time.nsec,
939  chunk_info.end_time.sec, chunk_info.end_time.nsec);
940 
941  writeHeader(header);
942 
943  writeDataLength(8 * chunk_connection_count);
944  // Write the topic names and counts
945  for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
946  uint32_t connection_id = i->first;
947  uint32_t count = i->second;
948 
949  write((char*) &connection_id, 4);
950  write((char*) &count, 4);
951 
952  CONSOLE_BRIDGE_logDebug(" - %d: %d", connection_id, count);
953  }
954  }
955 }
956 
958  // Read a CHUNK_INFO header
960  uint32_t data_size;
961  if (!readHeader(header) || !readDataLength(data_size))
962  throw BagFormatException("Error reading CHUNK_INFO record header");
963  M_string& fields = *header.getValues();
964  if (!isOp(fields, OP_CHUNK_INFO))
965  throw BagFormatException("Expected CHUNK_INFO op not found");
966 
967  // Check that the chunk info version is current
968  uint32_t chunk_info_version;
969  readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
970  if (chunk_info_version != CHUNK_INFO_VERSION)
971  throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str());
972 
973  // Read the chunk position, timestamp, and topic count fieldsstd_msgs::Float32
974  ChunkInfo chunk_info;
975  readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos);
976  readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time);
977  readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time);
978  uint32_t chunk_connection_count = 0;
979  readField(fields, COUNT_FIELD_NAME, true, &chunk_connection_count);
980 
981  CONSOLE_BRIDGE_logDebug("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
982  (unsigned long long) chunk_info.pos, chunk_connection_count,
983  chunk_info.start_time.sec, chunk_info.start_time.nsec,
984  chunk_info.end_time.sec, chunk_info.end_time.nsec);
985 
986  // Read the topic count entries
987  for (uint32_t i = 0; i < chunk_connection_count; i ++) {
988  uint32_t connection_id, connection_count;
989  read((char*) &connection_id, 4);
990  read((char*) &connection_count, 4);
991 
992  CONSOLE_BRIDGE_logDebug(" %d: %d messages", connection_id, connection_count);
993 
994  chunk_info.connection_counts[connection_id] = connection_count;
995  }
996 
997  chunks_.push_back(chunk_info);
998 }
999 
1000 // Record I/O
1001 
1002 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
1003  uint8_t op = 0xFF; // nonexistent op
1004  readField(fields, OP_FIELD_NAME, true, &op);
1005  return op == reqOp;
1006 }
1007 
1008 void Bag::writeHeader(M_string const& fields) {
1009  std::vector<uint8_t> header_buffer;
1010  uint32_t header_len;
1011  rs2rosinternal::Header::write(fields, header_buffer, header_len);
1012  write((char*) &header_len, 4);
1013  write((char*) header_buffer.data(), header_len);
1014 }
1015 
1017  write((char*) &data_len, 4);
1018 }
1019 
1021  std::vector<uint8_t> header_buffer;
1022  uint32_t header_len;
1023  rs2rosinternal::Header::write(fields, header_buffer, header_len);
1024 
1025  uint32_t offset = buf.getSize();
1026 
1027  buf.setSize(buf.getSize() + 4 + header_len);
1028 
1029  memcpy(buf.getData() + offset, &header_len, 4);
1030  offset += 4;
1031  memcpy(buf.getData() + offset, header_buffer.data(), header_len);
1032 }
1033 
1035  uint32_t offset = buf.getSize();
1036 
1037  buf.setSize(buf.getSize() + 4);
1038 
1039  memcpy(buf.getData() + offset, &data_len, 4);
1040 }
1041 
1044  assert(buffer.getSize() > 8);
1045 
1046  uint8_t* start = (uint8_t*) buffer.getData() + offset;
1047 
1048  uint8_t* ptr = start;
1049 
1050  // Read the header length
1051  uint32_t header_len;
1052  memcpy(&header_len, ptr, 4);
1053  ptr += 4;
1054 
1055  // Parse the header
1056  string error_msg;
1057  bool parsed = header.parse(ptr, header_len, error_msg);
1058  if (!parsed)
1059  throw BagFormatException("Error parsing header");
1060  ptr += header_len;
1061 
1062  // Read the data size
1063  memcpy(&data_size, ptr, 4);
1064  ptr += 4;
1065 
1066  bytes_read = static_cast<uint32_t>(ptr - start);
1067 }
1068 
1070  (void)buffer;
1071  total_bytes_read = 0;
1072  uint8_t op = 0xFF;
1073  do {
1074  CONSOLE_BRIDGE_logDebug("reading header from buffer: offset=%d", offset);
1075  uint32_t bytes_read;
1076  readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
1077 
1078  offset += bytes_read;
1079  total_bytes_read += bytes_read;
1080 
1081  readField(*header.getValues(), OP_FIELD_NAME, true, &op);
1082  }
1083  while (op == OP_MSG_DEF || op == OP_CONNECTION);
1084 
1085  if (op != OP_MSG_DATA)
1086  throw BagFormatException("Expected MSG_DATA op not found");
1087 }
1088 
1090  // Read the header length
1091  uint32_t header_len;
1092  read((char*) &header_len, 4);
1093 
1094  // Read the header
1095  header_buffer_.setSize(header_len);
1096  read((char*) header_buffer_.getData(), header_len);
1097 
1098  // Parse the header
1099  string error_msg;
1100  bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
1101  if (!parsed)
1102  return false;
1103 
1104  return true;
1105 }
1106 
1107 bool Bag::readDataLength(uint32_t& data_size) const {
1108  read((char*) &data_size, 4);
1109  return true;
1110 }
1111 
1112 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
1113  M_string::const_iterator fitr = fields.find(field);
1114  if (fitr == fields.end()) {
1115  if (required)
1116  throw BagFormatException("Required '" + field + "' field missing");
1117  }
1118  else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
1119  throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
1120 
1121  return fitr;
1122 }
1123 
1124 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
1125  return readField(fields, field_name, 1, UINT_MAX, required, data);
1126 }
1127 
1128 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
1129  M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
1130  if (fitr == fields.end())
1131  return false;
1132 
1133  data = fitr->second;
1134  return true;
1135 }
1136 
1137 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
1138  uint64_t packed_time;
1139  if (!readField(fields, field_name, required, &packed_time))
1140  return false;
1141 
1142  uint64_t bitmask = (1LL << 33) - 1;
1143  data.sec = (uint32_t) (packed_time & bitmask);
1144  data.nsec = (uint32_t) (packed_time >> 32);
1145 
1146  return true;
1147 }
1148 
1149 std::string Bag::toHeaderString(Time const* field) const {
1150  uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
1151  return toHeaderString(&packed_time);
1152 }
1153 
1154 
1155 // Low-level I/O
1156 
1157 void Bag::write(string const& s) { write(s.c_str(), s.length()); }
1158 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n); }
1159 
1160 void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); }
1161 void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); }
1162 
1163 } // namespace rosbag
static const std::string COMPRESSION_BZ2
Definition: constants.h:96
void openRead(std::string const &filename)
open file for reading
static const std::string END_TIME_FIELD_NAME
Definition: constants.h:62
const char * md5sum()
returns MD5Sum<M>::value();
Provides functionality to parse a connection header and retrieve values from it.
Definition: header.h:52
static const std::string CHUNK_COUNT_FIELD_NAME
Definition: constants.h:56
GLuint GLuint end
BagMode mode_
Definition: bag.h:275
void read(void *ptr, size_t size)
read size bytes from the file into ptr
static const std::string COMPRESSION_FIELD_NAME
Definition: constants.h:58
std::map< std::string, uint32_t > topic_connection_ids_
Definition: bag.h:293
uint32_t getMajorVersion() const
Get the major-version of the open bag file.
Definition: bag.cpp:286
BagMode getMode() const
Get the mode the bag is in.
Definition: bag.cpp:185
typedef void(APIENTRY *GLDEBUGPROC)(GLenum source
void decompressLz4Chunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:870
uint64_t pos
latest timestamp of a message in the chunk
Definition: structures.h:65
void openRead(std::string const &filename)
Definition: bag.cpp:120
static const std::string VER_FIELD_NAME
Definition: constants.h:52
CompressionType getCompression() const
Get the compression method to use for writing chunks.
Definition: bag.cpp:197
static const std::string COMPRESSION_LZ4
Definition: constants.h:97
GLdouble s
bool isOpen() const
return true if file is open for reading or writing
rs2rosinternal::Time start_time
Definition: structures.h:63
void open(std::string const &filename, uint32_t mode=bagmode::Read)
Open a bag file.
Definition: bag.cpp:101
GLsizei const GLchar *const * string
rs2rosinternal::Time end_time
earliest timestamp of a message in the chunk
Definition: structures.h:64
std::map< uint32_t, ConnectionInfo * > connections_
Definition: bag.h:295
std::string compression
Definition: structures.h:72
ROSTIME_DECL const Time TIME_MAX
int version_
Definition: bag.h:277
std::string toHeaderString(T const *field) const
Definition: bag.h:344
static const std::string CONNECTION_FIELD_NAME
Definition: constants.h:57
void startReadingVersion102()
Definition: bag.cpp:346
void write(std::string const &s)
uint8_t * getData()
Definition: buffer.cpp:50
uint64_t file_size_
Definition: bag.h:282
static const std::string INDEX_POS_FIELD_NAME
Definition: constants.h:54
void setChunkThreshold(uint32_t chunk_threshold)
Set the threshold for creating new chunks.
Definition: bag.cpp:190
void readTopicIndexRecord102()
Definition: bag.cpp:566
std::map< uint32_t, uint32_t > connection_counts
absolute byte offset of chunk record in bag file
Definition: structures.h:67
rs2rosinternal::Header readMessageDataHeader(IndexEntry const &index_entry)
Definition: bag.cpp:887
Buffer header_buffer_
reusable buffer in which to assemble the record header before writing to file
Definition: bag.h:302
static const unsigned char OP_CONNECTION
Definition: constants.h:80
std::map< rs2rosinternal::M_string, uint32_t > header_connection_ids_
Definition: bag.h:294
void openWrite(std::string const &filename)
Definition: bag.cpp:133
uint32_t getMinorVersion() const
Get the minor-version of the open bag file.
Definition: bag.cpp:287
Base class for rosbag exceptions.
Definition: exceptions.h:43
bool truncate(uint64_t length)
void seek(uint64_t offset, int origin=std::ios_base::beg)
seek to given offset from origin
std_msgs::Header * header(M &m)
returns Header<M>::pointer(m);
rs2rosinternal::M_string::const_iterator checkField(rs2rosinternal::M_string const &fields, std::string const &field, unsigned int min_len, unsigned int max_len, bool required) const
Definition: bag.cpp:1112
void writeConnectionRecords()
Definition: bag.cpp:678
static const std::string DEF_FIELD_NAME
Definition: constants.h:68
void writeConnectionRecord(ConnectionInfo const *connection_info)
Definition: bag.cpp:685
void openAppend(std::string const &filename)
Definition: bag.cpp:139
unsigned char uint8_t
Definition: stdint.h:78
void close()
close the file
void seek(uint64_t pos, int origin=std::ios_base::beg) const
Definition: bag.cpp:1161
e
Definition: rmse.py:177
void appendDataLengthToBuffer(Buffer &buf, uint32_t data_len)
Definition: bag.cpp:1034
GLenum GLfloat * buffer
std::string getFileName() const
return path of currently open file
Exception thrown on problems reading the bag format.
Definition: exceptions.h:57
BagMode
The possible modes to open a bag in.
Definition: bag.h:71
GLenum GLuint id
uint64_t getOffset() const
return current offset from the beginning of the file
std::string md5sum
Definition: structures.h:55
void setWriteMode(CompressionType type)
static const unsigned char OP_MSG_DATA
Definition: constants.h:75
GLuint index
void close()
Close the bag file.
Definition: bag.cpp:161
const char * datatype()
returns DataType<M>::value();
uint32_t getCompressedBytesIn() const
return the number of bytes written to current compressed stream
GLenum GLuint GLenum GLsizei const GLchar * buf
static const unsigned char OP_CHUNK
Definition: constants.h:78
Definition: parser.hpp:157
void readChunkHeader(ChunkHeader &chunk_header) const
Definition: bag.cpp:519
static const std::string VERSION
Definition: constants.h:44
Buffer decompress_buffer_
reusable buffer to decompress chunks into
Definition: bag.h:306
uint64_t getSize() const
Get the current size of the bag file (a lower bound)
Definition: bag.cpp:186
void appendConnectionRecordToBuffer(Buffer &buf, ConnectionInfo const *connection_info)
Definition: bag.cpp:698
uint32_t compressed_size
chunk compression type, e.g. "none" or "bz2" (see constants.h)
Definition: structures.h:73
GLenum mode
void readMessageDefinitionRecord102()
Definition: bag.cpp:744
void writeFileHeaderRecord()
Definition: bag.cpp:382
void readVersion()
Definition: bag.cpp:267
static const std::string CHUNK_POS_FIELD_NAME
Definition: constants.h:63
uint64_t decompressed_chunk_
position of decompressed chunk
Definition: bag.h:312
uint64_t index_data_pos_
Definition: bag.h:284
~Bag()
Definition: bag.cpp:97
#define assert(condition)
Definition: lz4.c:245
std::shared_ptr< rs2rosinternal::M_string > header
Definition: structures.h:58
unsigned int uint32_t
Definition: stdint.h:80
static const std::string START_TIME_FIELD_NAME
Definition: constants.h:61
uint32_t getChunkOffset() const
Definition: bag.cpp:444
Exception thrown on problems reading the bag index.
Definition: exceptions.h:64
static const uint32_t CHUNK_INFO_VERSION
Definition: constants.h:92
void readMessageDataRecord102(uint64_t offset, rs2rosinternal::Header &header) const
Definition: bag.cpp:819
static void write(const M_string &key_vals, std::vector< uint8_t > &buffer, uint32_t &size)
Definition: header.cpp:125
Buffer * current_buffer_
Definition: bag.h:310
Exception thrown when on IO problems.
Definition: exceptions.h:50
GLint GLint GLsizei GLint GLenum format
void readMessageDataHeaderFromBuffer(Buffer &buffer, uint32_t offset, rs2rosinternal::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
Definition: bag.cpp:1069
void write(std::string const &topic, rs2rosinternal::MessageEvent< T > const &event)
Write a message into the bag file.
Definition: bag.h:324
Time representation. May either represent wall clock time or ROS clock time.
Definition: time.h:177
std::map< std::string, std::string > M_string
Definition: datatypes.h:45
void setCompression(CompressionType compression)
Set the compression method to use for writing chunks.
Definition: bag.cpp:241
unsigned __int64 uint64_t
Definition: stdint.h:90
std::string msg_def
Definition: structures.h:56
void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size)
Definition: bag.cpp:496
static const std::string OP_FIELD_NAME
Definition: constants.h:50
std::string getline()
uint64_t file_header_pos_
Definition: bag.h:283
uint32_t bag_revision_
Definition: bag.h:280
GLuint start
Buffer record_buffer_
reusable buffer in which to assemble the record data before writing to file
Definition: bag.h:303
void writeVersion()
Definition: bag.cpp:257
void stopWritingChunk()
Definition: bag.cpp:469
static const std::string TYPE_FIELD_NAME
Definition: constants.h:67
uint32_t readMessageDataSize(IndexEntry const &index_entry) const
Definition: bag.cpp:906
void startWritingChunk(rs2rosinternal::Time time)
Definition: bag.cpp:451
#define op
Buffer chunk_buffer_
reusable buffer to read chunk into
Definition: bag.h:305
ChunkInfo curr_chunk_info_
Definition: bag.h:290
basic_format< char > format
Definition: format_fwd.hpp:25
static const std::string COUNT_FIELD_NAME
Definition: constants.h:53
void decompressChunk(uint64_t chunk_pos) const
Definition: bag.cpp:788
std::map< uint32_t, std::multiset< IndexEntry > > connection_indexes_
Definition: bag.h:299
#define CONSOLE_BRIDGE_logDebug(fmt,...)
Definition: console.h:87
uint64_t chunk_pos
timestamp of the message
Definition: structures.h:80
static const unsigned char OP_FILE_HEADER
Definition: constants.h:76
static const unsigned char OP_INDEX_DATA
Definition: constants.h:77
uint32_t connection_count_
Definition: bag.h:285
void readConnectionIndexRecord200()
Definition: bag.cpp:628
bool isOp(rs2rosinternal::M_string &fields, uint8_t reqOp) const
Definition: bag.cpp:1002
void readChunkInfoRecord()
Definition: bag.cpp:957
void decompressRawChunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:842
void openWrite(std::string const &filename)
open file for writing
void stopWriting()
Definition: bag.cpp:297
GLboolean GLboolean GLboolean b
uint32_t offset
absolute byte offset of the chunk record containing the message
Definition: structures.h:81
uint32_t chunk_threshold_
Definition: bag.h:279
std::vector< ChunkInfo > chunks_
Definition: bag.h:297
std::string datatype
Definition: structures.h:54
M_stringPtr getValues()
Returns a shared pointer to the internal map used.
Definition: header.h:68
void startReadingVersion200()
Definition: bag.cpp:311
uint32_t chunk_count_
Definition: bag.h:286
bagmode::BagMode BagMode
Definition: bag.h:78
void read(char *b, std::streamsize n) const
Definition: bag.cpp:1160
ROSTIME_DECL const Time TIME_MIN
void closeWrite()
Definition: bag.cpp:180
std::tuple< std::string, uint64_t, uint64_t > getCompressionInfo() const
Definition: bag.cpp:199
Definition: bag.h:66
void writeDataLength(uint32_t data_len)
Definition: bag.cpp:1016
uint32_t getChunkThreshold() const
Get the threshold for creating new chunks.
Definition: bag.cpp:188
bool readHeader(rs2rosinternal::Header &header) const
Definition: bag.cpp:1089
uint32_t getSize() const
Definition: buffer.cpp:52
void readHeaderFromBuffer(Buffer &buffer, uint32_t offset, rs2rosinternal::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
Definition: bag.cpp:1043
CompressionType compression_
Definition: bag.h:278
void decompress(CompressionType compression, uint8_t *dest, unsigned int dest_len, uint8_t *source, unsigned int source_len)
std::map< uint32_t, std::multiset< IndexEntry > > curr_chunk_connection_indexes_
Definition: bag.h:300
GLint GLsizei count
uint32_t uncompressed_size
compressed size of the chunk in bytes
Definition: structures.h:74
static const std::string CONNECTION_COUNT_FIELD_NAME
Definition: constants.h:55
static const uint32_t FILE_HEADER_LENGTH
Definition: constants.h:86
uint64_t curr_chunk_data_pos_
Definition: bag.h:291
static const std::string MD5_FIELD_NAME
Definition: constants.h:66
Buffer outgoing_chunk_buffer_
reusable buffer to read chunk into
Definition: bag.h:308
std::string getFileName() const
Get the filename of the bag.
Definition: bag.cpp:184
float rs2_vector::* pos
static const unsigned char OP_CHUNK_INFO
Definition: constants.h:79
bool readDataLength(uint32_t &data_size) const
Definition: bag.cpp:1107
static const std::string SIZE_FIELD_NAME
Definition: constants.h:59
void readConnectionRecord()
Definition: bag.cpp:708
static const std::string COMPRESSION_NONE
Definition: constants.h:95
int i
void writeIndexRecords()
Definition: bag.cpp:537
void appendHeaderToBuffer(Buffer &buf, rs2rosinternal::M_string const &fields)
Definition: bag.cpp:1020
rs2rosinternal::Time time
Definition: structures.h:79
void startWriting()
Definition: bag.cpp:291
void writeChunkInfoRecords()
Definition: bag.cpp:924
void writeHeader(rs2rosinternal::M_string const &fields)
Definition: bag.cpp:1008
ChunkedFile file_
Definition: bag.h:276
static const std::string TOPIC_FIELD_NAME
Definition: constants.h:51
::std_msgs::Time_< std::allocator< void > > Time
Definition: Time.h:47
void openReadWrite(std::string const &filename)
open file for reading & writing
static const unsigned char OP_MSG_DEF
Definition: constants.h:83
void setSize(uint32_t size)
Definition: buffer.cpp:54
bool readField(rs2rosinternal::M_string const &fields, std::string const &field_name, bool required, T *data) const
Definition: bag.h:349
GeneratorWrapper< T > map(Func &&function, GeneratorWrapper< U > &&generator)
Definition: catch.hpp:4271
void readFileHeaderRecord()
Definition: bag.cpp:414
static const uint32_t INDEX_VERSION
Definition: constants.h:89
Definition: parser.hpp:153
bool chunk_open_
Definition: bag.h:289
void decompressBz2Chunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:854
GLintptr offset
#define CONSOLE_BRIDGE_logError(fmt,...)
Definition: console.h:78
bool parse(const std::vector< uint8_t > &buffer, uint32_t size, std::string &error_msg)
Parse a header out of a buffer of data.
Definition: header.cpp:69


librealsense2
Author(s): LibRealSense ROS Team
autogenerated on Thu Dec 22 2022 03:41:42