bag.cpp
Go to the documentation of this file.
1 // Copyright (c) 2009, Willow Garage, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 //
7 // * Redistributions of source code must retain the above copyright
8 // notice, this list of conditions and the following disclaimer.
9 // * Redistributions in binary form must reproduce the above copyright
10 // notice, this list of conditions and the following disclaimer in the
11 // documentation and/or other materials provided with the distribution.
12 // * Neither the name of Willow Garage, Inc. nor the names of its
13 // contributors may be used to endorse or promote products derived from
14 // this software without specific prior written permission.
15 //
16 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 // POSSIBILITY OF SUCH DAMAGE.
27 
28 #include "rosbag/bag.h"
30 #include "rosbag/query.h"
31 #include "rosbag/view.h"
32 
33 #if defined(_MSC_VER)
34  #include <stdint.h> // only on v2010 and later -> is this enough for msvc and linux?
35 #else
36  #include <inttypes.h>
37 #endif
38 #include <signal.h>
39 #include <assert.h>
40 #include <iomanip>
41 #include <map>
42 #include <tuple>
43 #include <tuple>
44 
45 #include "console_bridge/console.h"
46 #include <memory.h>
47 
48 using std::map;
49 using std::string;
50 using std::vector;
51 using std::multiset;
52 using std::shared_ptr;
55 
56 namespace rosbag {
57 
59  mode_(bagmode::Write),
60  version_(0),
61  compression_(compression::Uncompressed),
62  chunk_threshold_(768 * 1024), // 768KB chunks
63  bag_revision_(0),
64  file_size_(0),
65  file_header_pos_(0),
66  index_data_pos_(0),
67  connection_count_(0),
68  chunk_count_(0),
69  chunk_open_(false),
70  curr_chunk_data_pos_(0),
71  current_buffer_(0),
72  decompressed_chunk_(0)
73 {
74 }
75 
76 Bag::Bag(string const& filename, uint32_t mode) :
77  compression_(compression::Uncompressed),
78  chunk_threshold_(768 * 1024), // 768KB chunks
79  bag_revision_(0),
80  file_size_(0),
81  file_header_pos_(0),
82  index_data_pos_(0),
83  connection_count_(0),
84  chunk_count_(0),
85  chunk_open_(false),
86  curr_chunk_data_pos_(0),
87  current_buffer_(0),
88  decompressed_chunk_(0)
89 {
90  open(filename, mode);
91 }
92 
94  close();
95 }
96 
97 void Bag::open(string const& filename, uint32_t mode) {
98  mode_ = (BagMode) mode;
99 
100  if (mode_ & bagmode::Append)
102  else if (mode_ & bagmode::Write)
104  else if (mode_ & bagmode::Read)
106  else
107  throw BagException( "Unknown mode: " + std::to_string( (int)mode ) );
108 
109  // Determine file size
111  seek(0, std::ios::end);
113  seek(offset);
114 }
115 
116 void Bag::openRead(string const& filename) {
118 
119  readVersion();
120 
121  switch (version_) {
122  case 102: startReadingVersion102(); break;
123  case 200: startReadingVersion200(); break;
124  default:
125  throw BagException( "Unsupported bag file version: " + std::to_string( getMajorVersion() ) + '.'
127  }
128 }
129 
130 void Bag::openWrite(string const& filename) {
132 
133  startWriting();
134 }
135 
136 void Bag::openAppend(string const& filename) {
138 
139  readVersion();
140 
141  if (version_ != 200)
142  throw BagException( "Bag file version " + std::to_string( getMajorVersion() ) + '.'
143  + std::to_string( getMinorVersion() ) + " is unsupported for appending" );
144 
146 
147  // Truncate the file to chop off the index
149  index_data_pos_ = 0;
150 
151  // Rewrite the file header, clearing the index position (so we know if the index is invalid)
154 
155  // Seek to the end of the file
156  seek(0, std::ios::end);
157 }
158 
159 void Bag::close() {
160  if (!file_.isOpen())
161  return;
162 
164  closeWrite();
165 
166  file_.close();
167 
168  topic_connection_ids_.clear();
169  header_connection_ids_.clear();
170  for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
171  delete i->second;
172  connections_.clear();
173  chunks_.clear();
174  connection_indexes_.clear();
176 }
177 
179  stopWriting();
180 }
181 
182 string Bag::getFileName() const { return file_.getFileName(); }
183 BagMode Bag::getMode() const { return mode_; }
184 uint64_t Bag::getSize() const { return file_size_; }
185 
187 
188 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
189  if (file_.isOpen() && chunk_open_)
191 
192  chunk_threshold_ = chunk_threshold;
193 }
194 
196 
197 std::tuple<std::string, uint64_t, uint64_t> Bag::getCompressionInfo() const
198 {
199  std::map<std::string, uint64_t> compression_counts;
200  std::map<std::string, uint64_t> compression_uncompressed;
201  std::map<std::string, uint64_t> compression_compressed;
202  auto compression = compression_;
203  uint64_t uncompressed = 0;
204  uint64_t compressed = 0;
205  for( ChunkInfo const & curr_chunk_info : chunks_ )
206  {
207  seek(curr_chunk_info.pos);
208 
209  // Skip over the chunk data
210  ChunkHeader chunk_header;
211  readChunkHeader(chunk_header);
212  seek(chunk_header.compressed_size, std::ios::cur);
213 
214  compression_counts[chunk_header.compression] += 1;
215  compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size;
216  uncompressed += chunk_header.uncompressed_size;
217  compression_compressed[chunk_header.compression] += chunk_header.compressed_size;
218  compressed += chunk_header.compressed_size;
219  }
220 
221  auto chunk_count = chunks_.size();
222  uint64_t main_compression_count = 0;
223  std::string main_compression;
224  for (auto&& kvp : compression_counts)
225  {
226  if (kvp.second > main_compression_count)
227  {
228  main_compression = kvp.first;
229  main_compression_count = kvp.second;
230  }
231  }
232  //auto main_compression_str = compression_counts.begin()->first;
233  //CompressionType main_compression = (main_compression_str == "Uncompressed") ? CompressionType::Uncompressed : (main_compression_str == "LZ4") ? CompressionType::LZ4 : CompressionType::BZ2;
234  //auto main_compression_count = compression_counts.begin()->second;
235  return std::make_tuple(main_compression, compressed, uncompressed);
236 }
238  if (file_.isOpen() && chunk_open_)
240 
241  if (!(compression == compression::Uncompressed ||
242  compression == compression::BZ2 ||
243  compression == compression::LZ4)) {
244  throw BagException( "Unknown compression type: " + std::to_string( (int)compression ) );
245  }
246 
247  compression_ = compression;
248 }
249 
250 // Version
251 
253  string version = string("#ROSBAG V") + VERSION + string("\n");
254 
255  CONSOLE_BRIDGE_logDebug("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
256 
257  version_ = 200;
258 
259  write(version);
260 }
261 
263  string version_line = file_.getline();
264 
266 
267  char logtypename[100];
268  int version_major, version_minor;
269 #if defined(_MSC_VER)
270  if (sscanf_s(version_line.c_str(), "#ROS%s V%d.%d", logtypename, static_cast<unsigned>(sizeof(logtypename)), &version_major, &version_minor) != 3)
271 #else
272  if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3)
273 #endif
274  throw BagIOException("Error reading version line");
275 
276  version_ = version_major * 100 + version_minor;
277 
278  CONSOLE_BRIDGE_logDebug("Read VERSION: version=%d", version_);
279 }
280 
281 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
282 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
283 
284 //
285 
287  writeVersion();
290 }
291 
293  if (chunk_open_)
295 
296  seek(0, std::ios::end);
297 
301 
304 }
305 
307  // Read the file header record, which points to the end of the chunks
309 
310  // Seek to the end of the chunks
312 
313  // Read the connection records (one for each connection)
314  for (uint32_t i = 0; i < connection_count_; i++)
316 
317  // Read the chunk info records
318  for (uint32_t i = 0; i < chunk_count_; i++)
320 
321  // Read the connection indexes for each chunk
322  for( ChunkInfo const & chunk_info : chunks_ )
323  {
324  curr_chunk_info_ = chunk_info;
325 
327 
328  // Skip over the chunk data
329  ChunkHeader chunk_header;
330  readChunkHeader(chunk_header);
331  seek(chunk_header.compressed_size, std::ios::cur);
332 
333  // Read the index records after the chunk
334  for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
336  }
337 
338  // At this point we don't have a curr_chunk_info anymore so we reset it
340 }
341 
343  try
344  {
345  // Read the file header record, which points to the start of the topic indexes
347  }
348  catch (BagFormatException ex) {
349  throw BagUnindexedException();
350  }
351 
352  // Get the length of the file
353  seek(0, std::ios::end);
354  uint64_t filelength = file_.getOffset();
355 
356  // Seek to the beginning of the topic index records
358 
359  // Read the topic index records, which point to the offsets of each message in the file
360  while (file_.getOffset() < filelength)
362 
363  // Read the message definition records (which are the first entry in the topic indexes)
364  for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
365  multiset<IndexEntry> const& index = i->second;
366  IndexEntry const& first_entry = *index.begin();
367 
368  CONSOLE_BRIDGE_logDebug("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
369 
370  seek(first_entry.chunk_pos);
371 
373  }
374 }
375 
376 // File header record
377 
379  connection_count_ = static_cast<uint32_t>(connections_.size());
380  chunk_count_ = static_cast<uint32_t>(chunks_.size());
381 
382  CONSOLE_BRIDGE_logDebug("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
383  (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
384 
385  // Write file header record
391 
392  std::vector<uint8_t> header_buffer;
393  uint32_t header_len;
394  rs2rosinternal::Header::write(header, header_buffer, header_len);
395  uint32_t data_len = 0;
396  if (header_len < FILE_HEADER_LENGTH)
397  data_len = FILE_HEADER_LENGTH - header_len;
398  write((char*) &header_len, 4);
399  write((char*) header_buffer.data(), header_len);
400  write((char*) &data_len, 4);
401 
402  // Pad the file header record out
403  if (data_len > 0) {
404  string padding;
405  padding.resize(data_len, ' ');
406  write(padding);
407  }
408 }
409 
412  uint32_t data_size;
413  if (!readHeader(header) || !readDataLength(data_size))
414  throw BagFormatException("Error reading FILE_HEADER record");
415 
416  M_string& fields = *header.getValues();
417 
418  if (!isOp(fields, OP_FILE_HEADER))
419  throw BagFormatException("Expected FILE_HEADER op not found");
420 
421  // Read index position
423 
424  if (index_data_pos_ == 0)
425  throw BagUnindexedException();
426 
427  // Read topic and chunks count
428  if (version_ >= 200) {
431  }
432 
433  CONSOLE_BRIDGE_logDebug("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
434  (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
435 
436  // Skip the data section (just padding)
437  seek(data_size, std::ios::cur);
438 }
439 
442  return static_cast<uint32_t>(file_.getOffset() - curr_chunk_data_pos_);
443  else
444  return file_.getCompressedBytesIn();
445 }
446 
448  // Initialize chunk info
451  curr_chunk_info_.end_time = time;
452 
453  // Write the chunk header, with a place-holder for the data sizes (we'll fill in when the chunk is finished)
455 
456  // Turn on compressed writing
458 
459  // Record where the data section of this chunk started
461 
462  chunk_open_ = true;
463 }
464 
466  // Add this chunk to the index
467  chunks_.push_back(curr_chunk_info_);
468 
469  // Get the uncompressed and compressed sizes
470  uint32_t uncompressed_size = getChunkOffset();
472  uint32_t compressed_size = static_cast<uint32_t>(file_.getOffset() - curr_chunk_data_pos_);
473 
474  // Rewrite the chunk header with the size of the chunk (remembering current offset)
475  uint64_t end_of_chunk_pos = file_.getOffset();
476 
478  writeChunkHeader(compression_, compressed_size, uncompressed_size);
479 
480  // Write out the indexes and clear them
481  seek(end_of_chunk_pos);
484 
485  // Clear the connection counts
487 
488  // Flag that we're starting a new chunk
489  chunk_open_ = false;
490 }
491 
492 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
493  ChunkHeader chunk_header;
494  switch (compression) {
495  case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
496  case compression::BZ2: chunk_header.compression = COMPRESSION_BZ2; break;
497  case compression::LZ4: chunk_header.compression = COMPRESSION_LZ4;
498  //case compression::ZLIB: chunk_header.compression = COMPRESSION_ZLIB; break;
499  }
500  chunk_header.compressed_size = compressed_size;
501  chunk_header.uncompressed_size = uncompressed_size;
502 
503  CONSOLE_BRIDGE_logDebug("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
504  (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
505 
508  header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
511 
512  writeDataLength(chunk_header.compressed_size);
513 }
514 
515 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
517  if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
518  throw BagFormatException("Error reading CHUNK record");
519 
520  M_string& fields = *header.getValues();
521 
522  if (!isOp(fields, OP_CHUNK))
523  throw BagFormatException("Expected CHUNK op not found");
524 
525  readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
526  readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size);
527 
528  CONSOLE_BRIDGE_logDebug("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
529 }
530 
531 // Index records
532 
534  for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
535  uint32_t connection_id = i->first;
536  multiset<IndexEntry> const& index = i->second;
537 
538  // Write the index record header
539  uint32_t index_size = static_cast<uint32_t>(index.size());
542  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
544  header[COUNT_FIELD_NAME] = toHeaderString(&index_size);
546 
547  writeDataLength(index_size * 12);
548 
549  CONSOLE_BRIDGE_logDebug("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
550 
551  // Write the index record data (pairs of timestamp and position in file)
552  for( IndexEntry const & e : index )
553  {
554  write((char*) &e.time.sec, 4);
555  write((char*) &e.time.nsec, 4);
556  write((char*) &e.offset, 4);
557 
558  CONSOLE_BRIDGE_logDebug(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
559  }
560  }
561 }
562 
565  uint32_t data_size;
566  if (!readHeader(header) || !readDataLength(data_size))
567  throw BagFormatException("Error reading INDEX_DATA header");
568  M_string& fields = *header.getValues();
569 
570  if (!isOp(fields, OP_INDEX_DATA))
571  throw BagFormatException("Expected INDEX_DATA record");
572 
573  uint32_t index_version;
574  string topic;
575  uint32_t count = 0;
576  readField(fields, VER_FIELD_NAME, true, &index_version);
577  readField(fields, TOPIC_FIELD_NAME, true, topic);
578  readField(fields, COUNT_FIELD_NAME, true, &count);
579 
580  CONSOLE_BRIDGE_logDebug("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
581 
582  if (index_version != 0)
583  throw BagFormatException( "Unsupported INDEX_DATA version: " + std::to_string( index_version ) );
584 
585  uint32_t connection_id;
586  map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
587  if (topic_conn_id_iter == topic_connection_ids_.end()) {
588  connection_id = static_cast<uint32_t>(connections_.size());
589 
590  CONSOLE_BRIDGE_logDebug("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
591  ConnectionInfo* connection_info = new ConnectionInfo();
592  connection_info->id = connection_id;
593  connection_info->topic = topic;
594  connections_[connection_id] = connection_info;
595 
596  topic_connection_ids_[topic] = connection_id;
597  }
598  else
599  connection_id = topic_conn_id_iter->second;
600 
601  multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
602 
603  for (uint32_t i = 0; i < count; i++) {
604  IndexEntry index_entry;
605  uint32_t sec;
606  uint32_t nsec;
607  read((char*) &sec, 4);
608  read((char*) &nsec, 4);
609  read((char*) &index_entry.chunk_pos, 8); //<! store position of the message in the chunk_pos field as it's 64 bits
610  index_entry.time = Time(sec, nsec);
611  index_entry.offset = 0;
612 
613  CONSOLE_BRIDGE_logDebug(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
614 
615  if (index_entry.time < rs2rosinternal::TIME_MIN || index_entry.time > rs2rosinternal::TIME_MAX)
616  {
617  CONSOLE_BRIDGE_logError("Index entry for topic %s contains invalid time.", topic.c_str());
618  } else
619  {
620  connection_index.insert(connection_index.end(), index_entry);
621  }
622  }
623 }
624 
627  uint32_t data_size;
628  if (!readHeader(header) || !readDataLength(data_size))
629  throw BagFormatException("Error reading INDEX_DATA header");
630  M_string& fields = *header.getValues();
631 
632  if (!isOp(fields, OP_INDEX_DATA))
633  throw BagFormatException("Expected INDEX_DATA record");
634 
635  uint32_t index_version;
636  uint32_t connection_id;
637  uint32_t count = 0;
638  readField(fields, VER_FIELD_NAME, true, &index_version);
639  readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
640  readField(fields, COUNT_FIELD_NAME, true, &count);
641 
642  CONSOLE_BRIDGE_logDebug("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
643 
644  if (index_version != 1)
645  throw BagFormatException( "Unsupported INDEX_DATA version: " + std::to_string( index_version ) );
646 
647  uint64_t chunk_pos = curr_chunk_info_.pos;
648 
649  multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
650 
651  for (uint32_t i = 0; i < count; i++) {
652  IndexEntry index_entry;
653  index_entry.chunk_pos = chunk_pos;
654  uint32_t sec;
655  uint32_t nsec;
656  read((char*) &sec, 4);
657  read((char*) &nsec, 4);
658  read((char*) &index_entry.offset, 4);
659  index_entry.time = Time(sec, nsec);
660 
661  CONSOLE_BRIDGE_logDebug(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
662 
663  if (index_entry.time < rs2rosinternal::TIME_MIN || index_entry.time > rs2rosinternal::TIME_MAX)
664  {
665  CONSOLE_BRIDGE_logError("Index entry for topic %s contains invalid time. This message will not be loaded.", connections_[connection_id]->topic.c_str());
666  } else
667  {
668  connection_index.insert(connection_index.end(), index_entry);
669  }
670  }
671 }
672 
673 // Connection records
674 
676  for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
677  ConnectionInfo const* connection_info = i->second;
678  writeConnectionRecord(connection_info);
679  }
680 }
681 
682 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) {
683  CONSOLE_BRIDGE_logDebug("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
684  (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
685 
688  header[TOPIC_FIELD_NAME] = connection_info->topic;
689  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
691 
692  writeHeader(*connection_info->header);
693 }
694 
698  header[TOPIC_FIELD_NAME] = connection_info->topic;
699  header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
701 
702  appendHeaderToBuffer(buf, *connection_info->header);
703 }
704 
707  if (!readHeader(header))
708  throw BagFormatException("Error reading CONNECTION header");
709  M_string& fields = *header.getValues();
710 
711  if (!isOp(fields, OP_CONNECTION))
712  throw BagFormatException("Expected CONNECTION op not found");
713 
714  uint32_t id;
715  readField(fields, CONNECTION_FIELD_NAME, true, &id);
716  string topic;
717  readField(fields, TOPIC_FIELD_NAME, true, topic);
718 
719  rs2rosinternal::Header connection_header;
720  if (!readHeader(connection_header))
721  throw BagFormatException("Error reading connection header");
722 
723  // If this is a new connection, update connections
724  map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
725  if (key == connections_.end()) {
726  ConnectionInfo* connection_info = new ConnectionInfo();
727  connection_info->id = id;
728  connection_info->topic = topic;
729  connection_info->header = std::make_shared<M_string>();
730  for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
731  (*connection_info->header)[i->first] = i->second;
732  connection_info->msg_def = (*connection_info->header)["message_definition"];
733  connection_info->datatype = (*connection_info->header)["type"];
734  connection_info->md5sum = (*connection_info->header)["md5sum"];
735  connections_[id] = connection_info;
736 
737  CONSOLE_BRIDGE_logDebug("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
738  }
739 }
740 
743  uint32_t data_size;
744  if (!readHeader(header) || !readDataLength(data_size))
745  throw BagFormatException("Error reading message definition header");
746  M_string& fields = *header.getValues();
747 
748  if (!isOp(fields, OP_MSG_DEF))
749  throw BagFormatException("Expected MSG_DEF op not found");
750 
751  string topic, md5sum, datatype, message_definition;
752  readField(fields, TOPIC_FIELD_NAME, true, topic);
753  readField(fields, MD5_FIELD_NAME, 32, 32, true, md5sum);
754  readField(fields, TYPE_FIELD_NAME, true, datatype);
755  readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition);
756 
757  ConnectionInfo* connection_info;
758 
759  map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
760  if (topic_conn_id_iter == topic_connection_ids_.end()) {
761  uint32_t id = static_cast<uint32_t>(connections_.size());
762 
763  CONSOLE_BRIDGE_logDebug("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
764  connection_info = new ConnectionInfo();
765  connection_info->id = id;
766  connection_info->topic = topic;
767 
768  connections_[id] = connection_info;
770  }
771  else
772  connection_info = connections_[topic_conn_id_iter->second];
773 
774  connection_info->msg_def = message_definition;
775  connection_info->datatype = datatype;
776  connection_info->md5sum = md5sum;
777  connection_info->header = std::make_shared<rs2rosinternal::M_string>();
778  (*connection_info->header)["type"] = connection_info->datatype;
779  (*connection_info->header)["md5sum"] = connection_info->md5sum;
780  (*connection_info->header)["message_definition"] = connection_info->msg_def;
781 
782  CONSOLE_BRIDGE_logDebug("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
783 }
784 
785 void Bag::decompressChunk(uint64_t chunk_pos) const {
786  if (curr_chunk_info_.pos == chunk_pos) {
788  return;
789  }
790 
792 
793  if (decompressed_chunk_ == chunk_pos)
794  return;
795 
796  // Seek to the start of the chunk
797  seek(chunk_pos);
798 
799  // Read the chunk header
800  ChunkHeader chunk_header;
801  readChunkHeader(chunk_header);
802 
803  // Read and decompress the chunk. These assume we are at the right place in the stream already
804  if (chunk_header.compression == COMPRESSION_NONE)
805  decompressRawChunk(chunk_header);
806  else if (chunk_header.compression == COMPRESSION_BZ2)
807  decompressBz2Chunk(chunk_header);
808  else if (chunk_header.compression == COMPRESSION_LZ4)
809  decompressLz4Chunk(chunk_header);
810  else
811  throw BagFormatException("Unknown compression: " + chunk_header.compression);
812 
813  decompressed_chunk_ = chunk_pos;
814 }
815 
817  CONSOLE_BRIDGE_logDebug("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
818 
819  seek(offset);
820 
821  uint32_t data_size;
822  uint8_t op;
823  do {
824  if (!readHeader(header) || !readDataLength(data_size))
825  throw BagFormatException("Error reading header");
826 
827  readField(*header.getValues(), OP_FIELD_NAME, true, &op);
828  }
829  while (op == OP_MSG_DEF);
830 
831  if (op != OP_MSG_DATA)
832  throw BagFormatException( "Expected MSG_DATA op, got " + std::to_string( op ) );
833 
834  record_buffer_.setSize(data_size);
835  file_.read((char*) record_buffer_.getData(), data_size);
836 }
837 
838 // Reading this into a buffer isn't completely necessary, but we do it anyways for now
839 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
840  assert(chunk_header.compression == COMPRESSION_NONE);
841  assert(chunk_header.compressed_size == chunk_header.uncompressed_size);
842 
843  CONSOLE_BRIDGE_logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
844 
846  file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size);
847 
848  // todo check read was successful
849 }
850 
851 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
852  assert(chunk_header.compression == COMPRESSION_BZ2);
853 
854  CompressionType compression = compression::BZ2;
855 
856  CONSOLE_BRIDGE_logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
857 
858  chunk_buffer_.setSize(chunk_header.compressed_size);
859  file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
860 
863 
864  // todo check read was successful
865 }
866 
867 void Bag::decompressLz4Chunk(ChunkHeader const& chunk_header) const {
868  assert(chunk_header.compression == COMPRESSION_LZ4);
869 
870  CompressionType compression = compression::LZ4;
871 
872  CONSOLE_BRIDGE_logDebug("lz4 compressed_size: %d uncompressed_size: %d",
873  chunk_header.compressed_size, chunk_header.uncompressed_size);
874 
875  chunk_buffer_.setSize(chunk_header.compressed_size);
876  file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
877 
880 
881  // todo check read was successful
882 }
883 
886  uint32_t data_size;
887  uint32_t bytes_read;
888  switch (version_)
889  {
890  case 200:
891  decompressChunk(index_entry.chunk_pos);
892  readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
893  return header;
894  case 102:
896  return header;
897  default:
898  throw BagFormatException( "Unhandled version: " + std::to_string( version_ ) );
899  }
900 }
901 
902 // NOTE: this loads the header, which is unnecessary
903 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
905  uint32_t data_size;
906  uint32_t bytes_read;
907  switch (version_)
908  {
909  case 200:
910  decompressChunk(index_entry.chunk_pos);
911  readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
912  return data_size;
913  case 102:
915  return record_buffer_.getSize();
916  default:
917  throw BagFormatException( "Unhandled version: " + std::to_string( version_ ) );
918  }
919 }
920 
922  for( ChunkInfo const & chunk_info : chunks_ )
923  {
924  // Write the chunk info header
926  uint32_t chunk_connection_count = static_cast<uint32_t>(chunk_info.connection_counts.size());
929  header[CHUNK_POS_FIELD_NAME] = toHeaderString(&chunk_info.pos);
930  header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
931  header[END_TIME_FIELD_NAME] = toHeaderString(&chunk_info.end_time);
932  header[COUNT_FIELD_NAME] = toHeaderString(&chunk_connection_count);
933 
934  CONSOLE_BRIDGE_logDebug("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
935  (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
936  chunk_info.start_time.sec, chunk_info.start_time.nsec,
937  chunk_info.end_time.sec, chunk_info.end_time.nsec);
938 
940 
941  writeDataLength(8 * chunk_connection_count);
942  // Write the topic names and counts
943  for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
944  uint32_t connection_id = i->first;
945  uint32_t count = i->second;
946 
947  write((char*) &connection_id, 4);
948  write((char*) &count, 4);
949 
950  CONSOLE_BRIDGE_logDebug(" - %d: %d", connection_id, count);
951  }
952  }
953 }
954 
956  // Read a CHUNK_INFO header
958  uint32_t data_size;
959  if (!readHeader(header) || !readDataLength(data_size))
960  throw BagFormatException("Error reading CHUNK_INFO record header");
961  M_string& fields = *header.getValues();
962  if (!isOp(fields, OP_CHUNK_INFO))
963  throw BagFormatException("Expected CHUNK_INFO op not found");
964 
965  // Check that the chunk info version is current
966  uint32_t chunk_info_version;
967  readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
968  if (chunk_info_version != CHUNK_INFO_VERSION)
969  throw BagFormatException( "Expected CHUNK_INFO version " + std::to_string( CHUNK_INFO_VERSION ) + ", read "
970  + std::to_string( chunk_info_version ) );
971 
972  // Read the chunk position, timestamp, and topic count fieldsstd_msgs::Float32
973  ChunkInfo chunk_info;
974  readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos);
975  readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time);
976  readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time);
977  uint32_t chunk_connection_count = 0;
978  readField(fields, COUNT_FIELD_NAME, true, &chunk_connection_count);
979 
980  CONSOLE_BRIDGE_logDebug("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
981  (unsigned long long) chunk_info.pos, chunk_connection_count,
982  chunk_info.start_time.sec, chunk_info.start_time.nsec,
983  chunk_info.end_time.sec, chunk_info.end_time.nsec);
984 
985  // Read the topic count entries
986  for (uint32_t i = 0; i < chunk_connection_count; i ++) {
987  uint32_t connection_id, connection_count;
988  read((char*) &connection_id, 4);
989  read((char*) &connection_count, 4);
990 
991  CONSOLE_BRIDGE_logDebug(" %d: %d messages", connection_id, connection_count);
992 
993  chunk_info.connection_counts[connection_id] = connection_count;
994  }
995 
996  chunks_.push_back(chunk_info);
997 }
998 
999 // Record I/O
1000 
1001 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
1002  uint8_t op = 0xFF; // nonexistent op
1003  readField(fields, OP_FIELD_NAME, true, &op);
1004  return op == reqOp;
1005 }
1006 
1007 void Bag::writeHeader(M_string const& fields) {
1008  std::vector<uint8_t> header_buffer;
1009  uint32_t header_len;
1010  rs2rosinternal::Header::write(fields, header_buffer, header_len);
1011  write((char*) &header_len, 4);
1012  write((char*) header_buffer.data(), header_len);
1013 }
1014 
1016  write((char*) &data_len, 4);
1017 }
1018 
1020  std::vector<uint8_t> header_buffer;
1021  uint32_t header_len;
1022  rs2rosinternal::Header::write(fields, header_buffer, header_len);
1023 
1024  uint32_t offset = buf.getSize();
1025 
1026  buf.setSize(buf.getSize() + 4 + header_len);
1027 
1028  memcpy(buf.getData() + offset, &header_len, 4);
1029  offset += 4;
1030  memcpy(buf.getData() + offset, header_buffer.data(), header_len);
1031 }
1032 
1034  uint32_t offset = buf.getSize();
1035 
1036  buf.setSize(buf.getSize() + 4);
1037 
1038  memcpy(buf.getData() + offset, &data_len, 4);
1039 }
1040 
1043  assert(buffer.getSize() > 8);
1044 
1045  uint8_t* start = (uint8_t*) buffer.getData() + offset;
1046 
1047  uint8_t* ptr = start;
1048 
1049  // Read the header length
1050  uint32_t header_len;
1051  memcpy(&header_len, ptr, 4);
1052  ptr += 4;
1053 
1054  // Parse the header
1055  string error_msg;
1056  bool parsed = header.parse(ptr, header_len, error_msg);
1057  if (!parsed)
1058  throw BagFormatException("Error parsing header");
1059  ptr += header_len;
1060 
1061  // Read the data size
1062  memcpy(&data_size, ptr, 4);
1063  ptr += 4;
1064 
1065  bytes_read = static_cast<uint32_t>(ptr - start);
1066 }
1067 
1069  (void)buffer;
1070  total_bytes_read = 0;
1071  uint8_t op = 0xFF;
1072  do {
1073  CONSOLE_BRIDGE_logDebug("reading header from buffer: offset=%d", offset);
1074  uint32_t bytes_read;
1075  readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
1076 
1077  offset += bytes_read;
1078  total_bytes_read += bytes_read;
1079 
1080  readField(*header.getValues(), OP_FIELD_NAME, true, &op);
1081  }
1082  while (op == OP_MSG_DEF || op == OP_CONNECTION);
1083 
1084  if (op != OP_MSG_DATA)
1085  throw BagFormatException("Expected MSG_DATA op not found");
1086 }
1087 
1089  // Read the header length
1090  uint32_t header_len;
1091  read((char*) &header_len, 4);
1092 
1093  // Read the header
1094  header_buffer_.setSize(header_len);
1095  read((char*) header_buffer_.getData(), header_len);
1096 
1097  // Parse the header
1098  string error_msg;
1099  bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
1100  if (!parsed)
1101  return false;
1102 
1103  return true;
1104 }
1105 
1106 bool Bag::readDataLength(uint32_t& data_size) const {
1107  read((char*) &data_size, 4);
1108  return true;
1109 }
1110 
1111 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
1112  M_string::const_iterator fitr = fields.find(field);
1113  if (fitr == fields.end()) {
1114  if (required)
1115  throw BagFormatException("Required '" + field + "' field missing");
1116  }
1117  else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
1118  throw BagFormatException( "Field '" + field + "' is wrong size ("
1119  + std::to_string( (uint32_t)fitr->second.size() ) + " bytes)" );
1120 
1121  return fitr;
1122 }
1123 
1124 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
1125  return readField(fields, field_name, 1, UINT_MAX, required, data);
1126 }
1127 
1128 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
1129  M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
1130  if (fitr == fields.end())
1131  return false;
1132 
1133  data = fitr->second;
1134  return true;
1135 }
1136 
1137 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
1138  uint64_t packed_time;
1139  if (!readField(fields, field_name, required, &packed_time))
1140  return false;
1141 
1142  uint64_t bitmask = (1LL << 33) - 1;
1143  data.sec = (uint32_t) (packed_time & bitmask);
1144  data.nsec = (uint32_t) (packed_time >> 32);
1145 
1146  return true;
1147 }
1148 
1149 std::string Bag::toHeaderString(Time const* field) const {
1150  uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
1151  return toHeaderString(&packed_time);
1152 }
1153 
1154 
1155 // Low-level I/O
1156 
1157 void Bag::write(string const& s) { write(s.c_str(), s.length()); }
1158 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n); }
1159 
1160 void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); }
1161 void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); }
1162 
1163 } // namespace rosbag
realdds::topics::metadata::key::header
const std::string header
rosbag::Bag::chunks_
std::vector< ChunkInfo > chunks_
Definition: bag.h:324
rosbag::ChunkedFile::getFileName
std::string getFileName() const
return path of currently open file
Definition: chunked_file.cpp:131
rs2rosinternal::TIME_MIN
const ROSTIME_DECL Time TIME_MIN
rs2rosinternal::Header
Provides functionality to parse a connection header and retrieve values from it.
Definition: header.h:50
rosbag::BagFormatException
Exception thrown on problems reading the bag format.
Definition: exceptions.h:89
rosbag::Bag::getMode
BagMode getMode() const
Get the mode the bag is in.
Definition: bag.cpp:183
rosbag::Buffer::getData
uint8_t * getData()
Definition: buffer.cpp:82
uint8_t
unsigned char uint8_t
Definition: stdint.h:78
count
GLint GLsizei count
Definition: glad/glad/glad.h:2301
rosbag::ChunkedFile::setWriteMode
void setWriteMode(CompressionType type)
Definition: chunked_file.cpp:153
rosbag::Bag::topic_connection_ids_
std::map< std::string, uint32_t > topic_connection_ids_
Definition: bag.h:320
op
#define op
rosbag::Bag::file_
ChunkedFile file_
Definition: bag.h:303
rs2rosinternal::Time
Time representation. May either represent wall clock time or ROS clock time.
Definition: time.h:206
rosbag::END_TIME_FIELD_NAME
static const std::string END_TIME_FIELD_NAME
Definition: constants.h:94
rosbag::Bag::getCompression
CompressionType getCompression() const
Get the compression method to use for writing chunks.
Definition: bag.cpp:195
rosbag::Bag::connection_count_
uint32_t connection_count_
Definition: bag.h:312
rosbag::Bag::read
void read(char *b, std::streamsize n) const
Definition: bag.cpp:1160
test-librs-connections.n
n
Definition: test-librs-connections.py:38
fps.required
required
Definition: fps.py:8
id
GLenum GLuint id
Definition: glad/glad/glad.h:135
std::to_string
std::string to_string(T value)
Definition: android_helpers.h:16
rosbag::COMPRESSION_NONE
static const std::string COMPRESSION_NONE
Definition: constants.h:127
rosbag::OP_CHUNK_INFO
static const unsigned char OP_CHUNK_INFO
Definition: constants.h:111
rosbag::Bag::readMessageDataSize
uint32_t readMessageDataSize(IndexEntry const &index_entry) const
Definition: bag.cpp:903
rosbag::Bag::decompressed_chunk_
uint64_t decompressed_chunk_
position of decompressed chunk
Definition: bag.h:339
rosbag::Bag::decompressBz2Chunk
void decompressBz2Chunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:851
rosbag::Bag::getMajorVersion
uint32_t getMajorVersion() const
Get the major-version of the open bag file.
Definition: bag.cpp:281
rosbag::ChunkedFile::seek
void seek(uint64_t offset, int origin=std::ios_base::beg)
seek to given offset from origin
Definition: chunked_file.cpp:177
rosbag::Buffer::setSize
void setSize(uint32_t size)
Definition: buffer.cpp:86
rs2::sw_update::version
rsutils::version version
Definition: versions-db-manager.h:30
rosbag::ChunkInfo::start_time
rs2rosinternal::Time start_time
Definition: structures.h:95
rosbag::FILE_HEADER_LENGTH
static const uint32_t FILE_HEADER_LENGTH
Definition: constants.h:118
rosbag::Bag::appendHeaderToBuffer
void appendHeaderToBuffer(Buffer &buf, rs2rosinternal::M_string const &fields)
Definition: bag.cpp:1019
b
GLboolean GLboolean GLboolean b
Definition: glad/glad/glad.h:3064
rosbag::Bag::readDataLength
bool readDataLength(uint32_t &data_size) const
Definition: bag.cpp:1106
data
Definition: parser.hpp:153
offset
GLintptr offset
Definition: glad/glad/glad.h:2737
rosbag::Bag::mode_
BagMode mode_
Definition: bag.h:302
rosbag::Bag::header_buffer_
Buffer header_buffer_
reusable buffer in which to assemble the record header before writing to file
Definition: bag.h:329
index
GLuint index
Definition: glad/glad/glad.h:2777
rosbag::ChunkedFile::openReadWrite
void openReadWrite(std::string const &filename)
open file for reading & writing
Definition: chunked_file.cpp:76
rosbag::Bag::readMessageDataHeaderFromBuffer
void readMessageDataHeaderFromBuffer(Buffer &buffer, uint32_t offset, rs2rosinternal::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
Definition: bag.cpp:1068
string
GLsizei const GLchar *const * string
Definition: glad/glad/glad.h:2861
mode
GLenum mode
Definition: glad/glad/glad.h:1385
rosbag::OP_MSG_DATA
static const unsigned char OP_MSG_DATA
Definition: constants.h:107
rosbag::Bag::curr_chunk_info_
ChunkInfo curr_chunk_info_
Definition: bag.h:317
rosbag::INDEX_VERSION
static const uint32_t INDEX_VERSION
Definition: constants.h:121
rosbag::Bag::decompressChunk
void decompressChunk(uint64_t chunk_pos) const
Definition: bag.cpp:785
rosbag::ChunkHeader::compressed_size
uint32_t compressed_size
chunk compression type, e.g. "none" or "bz2" (see constants.h)
Definition: structures.h:105
rosbag::ChunkHeader::uncompressed_size
uint32_t uncompressed_size
compressed size of the chunk in bytes
Definition: structures.h:106
void
typedef void(APIENTRY *GLDEBUGPROC)(GLenum source
rosbag::Bag::toHeaderString
std::string toHeaderString(T const *field) const
Definition: bag.h:339
rosbag::Buffer
Definition: buffer.h:75
rosbag::compression::LZ4
@ LZ4
Definition: third-party/realsense-file/rosbag/rosbag_storage/include/rosbag/stream.h:152
rosbag::CHUNK_INFO_VERSION
static const uint32_t CHUNK_INFO_VERSION
Definition: constants.h:124
rosbag::Bag::seek
void seek(uint64_t pos, int origin=std::ios_base::beg) const
Definition: bag.cpp:1161
rosbag::Bag::getChunkThreshold
uint32_t getChunkThreshold() const
Get the threshold for creating new chunks.
Definition: bag.cpp:186
rosbag::ConnectionInfo::topic
std::string topic
Definition: structures.h:117
console.h
rosbag::ConnectionInfo::md5sum
std::string md5sum
Definition: structures.h:119
rosbag::Bag::write
void write(std::string const &topic, rs2rosinternal::MessageEvent< T > const &event)
Write a message into the bag file.
Definition: bag.h:319
rosbag::ConnectionInfo
Definition: structures.h:80
rosbag::Bag::header_connection_ids_
std::map< rs2rosinternal::M_string, uint32_t > header_connection_ids_
Definition: bag.h:321
query.h
rosbag::Bag::close
void close()
Close the bag file.
Definition: bag.cpp:159
message_instance.h
rosbag::Bag::writeChunkInfoRecords
void writeChunkInfoRecords()
Definition: bag.cpp:921
rosbag::compression::BZ2
@ BZ2
Definition: third-party/realsense-file/rosbag/rosbag_storage/include/rosbag/stream.h:151
rosbag::Bag::connections_
std::map< uint32_t, ConnectionInfo * > connections_
Definition: bag.h:322
rosbag::Bag::readChunkInfoRecord
void readChunkInfoRecord()
Definition: bag.cpp:955
rosbag::Bag::decompressLz4Chunk
void decompressLz4Chunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:867
rosbag::Bag::stopWritingChunk
void stopWritingChunk()
Definition: bag.cpp:465
rosbag::Bag::checkField
rs2rosinternal::M_string::const_iterator checkField(rs2rosinternal::M_string const &fields, std::string const &field, unsigned int min_len, unsigned int max_len, bool required) const
Definition: bag.cpp:1111
rosbag::IndexEntry::offset
uint32_t offset
absolute byte offset of the chunk record containing the message
Definition: structures.h:113
rosbag::bagmode::Read
@ Read
Definition: bag.h:165
rosbag::Bag::~Bag
~Bag()
Definition: bag.cpp:93
rs2rosinternal::Header::write
static void write(const M_string &key_vals, std::vector< uint8_t > &buffer, uint32_t &size)
Definition: header.cpp:125
rosbag::bagmode::BagMode
BagMode
The possible modes to open a bag in.
Definition: bag.h:130
rosbag::ChunkedFile::getOffset
uint64_t getOffset() const
return current offset from the beginning of the file
Definition: chunked_file.cpp:190
rosbag::BagException
Base class for rosbag exceptions.
Definition: exceptions.h:75
rosbag::Bag::chunk_buffer_
Buffer chunk_buffer_
reusable buffer to read chunk into
Definition: bag.h:332
rosbag::OP_FILE_HEADER
static const unsigned char OP_FILE_HEADER
Definition: constants.h:108
rosbag::OP_CHUNK
static const unsigned char OP_CHUNK
Definition: constants.h:110
rosbag::Bag::writeConnectionRecords
void writeConnectionRecords()
Definition: bag.cpp:675
rosbag::Bag::startReadingVersion200
void startReadingVersion200()
Definition: bag.cpp:306
rosbag::Bag::index_data_pos_
uint64_t index_data_pos_
Definition: bag.h:311
rosbag::Bag::getMinorVersion
uint32_t getMinorVersion() const
Get the minor-version of the open bag file.
Definition: bag.cpp:282
rosbag::IndexEntry::chunk_pos
uint64_t chunk_pos
timestamp of the message
Definition: structures.h:112
rosbag::bagmode::Append
@ Append
Definition: bag.h:166
rs2rosinternal::M_string
std::map< std::string, std::string > M_string
Definition: datatypes.h:44
rosbag::ConnectionInfo::header
std::shared_ptr< rs2rosinternal::M_string > header
Definition: structures.h:122
rosbag::Bag::connection_indexes_
std::map< uint32_t, std::multiset< IndexEntry > > connection_indexes_
Definition: bag.h:326
rosbag::ChunkedFile::isOpen
bool isOpen() const
return true if file is open for reading or writing
Definition: chunked_file.cpp:130
uint32_t
unsigned int uint32_t
Definition: stdint.h:80
rosbag::Bag::readHeader
bool readHeader(rs2rosinternal::Header &header) const
Definition: bag.cpp:1088
rosbag::ChunkedFile::getline
std::string getline()
Definition: chunked_file.cpp:203
rosbag::Bag::startWritingChunk
void startWritingChunk(rs2rosinternal::Time time)
Definition: bag.cpp:447
rosbag::Bag::appendConnectionRecordToBuffer
void appendConnectionRecordToBuffer(Buffer &buf, ConnectionInfo const *connection_info)
Definition: bag.cpp:695
rosbag::BagIOException
Exception thrown when on IO problems.
Definition: exceptions.h:82
rosbag::ChunkInfo::connection_counts
std::map< uint32_t, uint32_t > connection_counts
absolute byte offset of chunk record in bag file
Definition: structures.h:99
rosbag::Bag::getSize
uint64_t getSize() const
Get the current size of the bag file (a lower bound)
Definition: bag.cpp:184
rosbag::CONNECTION_FIELD_NAME
static const std::string CONNECTION_FIELD_NAME
Definition: constants.h:89
rosbag::ChunkHeader
Definition: structures.h:102
rosbag::Bag::appendDataLengthToBuffer
void appendDataLengthToBuffer(Buffer &buf, uint32_t data_len)
Definition: bag.cpp:1033
rosbag::Bag::stopWriting
void stopWriting()
Definition: bag.cpp:292
buffer
GLenum GLfloat * buffer
Definition: glad/glad/glad.h:2066
rosbag::Bag::getChunkOffset
uint32_t getChunkOffset() const
Definition: bag.cpp:440
rosbag::MD5_FIELD_NAME
static const std::string MD5_FIELD_NAME
Definition: constants.h:98
rosbag::INDEX_POS_FIELD_NAME
static const std::string INDEX_POS_FIELD_NAME
Definition: constants.h:86
rosbag::Bag::setCompression
void setCompression(CompressionType compression)
Set the compression method to use for writing chunks.
Definition: bag.cpp:237
uint64_t
unsigned __int64 uint64_t
Definition: stdint.h:90
rosbag::ChunkInfo::end_time
rs2rosinternal::Time end_time
earliest timestamp of a message in the chunk
Definition: structures.h:96
i
int i
Definition: rs-pcl-color.cpp:54
rosbag::Bag::file_header_pos_
uint64_t file_header_pos_
Definition: bag.h:310
rosbag::Bag::writeConnectionRecord
void writeConnectionRecord(ConnectionInfo const *connection_info)
Definition: bag.cpp:682
rosbag::START_TIME_FIELD_NAME
static const std::string START_TIME_FIELD_NAME
Definition: constants.h:93
rosbag::Bag::getFileName
std::string getFileName() const
Get the filename of the bag.
Definition: bag.cpp:182
rosbag::ChunkedFile::getCompressedBytesIn
uint32_t getCompressedBytesIn() const
return the number of bytes written to current compressed stream
Definition: chunked_file.cpp:191
rs2rosinternal::message_traits::datatype
const char * datatype()
returns DataType<M>::value();
Definition: message_traits.h:214
rosbag::Bag::readMessageDataRecord102
void readMessageDataRecord102(uint64_t offset, rs2rosinternal::Header &header) const
Definition: bag.cpp:816
rosbag::Bag::readField
bool readField(rs2rosinternal::M_string const &fields, std::string const &field_name, bool required, T *data) const
Definition: bag.h:344
rosbag::ConnectionInfo::datatype
std::string datatype
Definition: structures.h:118
rosbag::Bag::writeHeader
void writeHeader(rs2rosinternal::M_string const &fields)
Definition: bag.cpp:1007
rosbag::TOPIC_FIELD_NAME
static const std::string TOPIC_FIELD_NAME
Definition: constants.h:83
rs2rosinternal::TIME_MAX
const ROSTIME_DECL Time TIME_MAX
rosbag::bagmode::Write
@ Write
Definition: bag.h:164
rosbag::ChunkedFile::close
void close()
close the file
Definition: chunked_file.cpp:133
CONSOLE_BRIDGE_logDebug
#define CONSOLE_BRIDGE_logDebug(fmt,...)
Definition: console.h:87
rosbag::Bag::isOp
bool isOp(rs2rosinternal::M_string &fields, uint8_t reqOp) const
Definition: bag.cpp:1001
rosbag::ChunkedFile::openWrite
void openWrite(std::string const &filename)
open file for writing
Definition: chunked_file.cpp:77
rosbag::Buffer::getSize
uint32_t getSize() const
Definition: buffer.cpp:84
stdint.h
rosbag::Bag::closeWrite
void closeWrite()
Definition: bag.cpp:178
rosbag::Bag::file_size_
uint64_t file_size_
Definition: bag.h:309
signal.h
rosbag::BagMode
bagmode::BagMode BagMode
Definition: bag.h:105
std_msgs::Time
::std_msgs::Time_< std::allocator< void > > Time
Definition: Time.h:47
start
GLuint start
Definition: glad/glad/glad.h:2395
rosbag::IndexEntry::time
rs2rosinternal::Time time
Definition: structures.h:111
bag.h
rosbag::Bag::getCompressionInfo
std::tuple< std::string, uint64_t, uint64_t > getCompressionInfo() const
Definition: bag.cpp:197
rosbag::ChunkedFile::decompress
void decompress(CompressionType compression, uint8_t *dest, unsigned int dest_len, uint8_t *source, unsigned int source_len)
Definition: chunked_file.cpp:215
rosbag::DEF_FIELD_NAME
static const std::string DEF_FIELD_NAME
Definition: constants.h:100
rosbag::Bag::readHeaderFromBuffer
void readHeaderFromBuffer(Buffer &buffer, uint32_t offset, rs2rosinternal::Header &header, uint32_t &data_size, uint32_t &bytes_read) const
Definition: bag.cpp:1042
rosbag::IndexEntry
Definition: structures.h:109
rosbag::Bag::current_buffer_
Buffer * current_buffer_
Definition: bag.h:337
rosbag::Bag::writeDataLength
void writeDataLength(uint32_t data_len)
Definition: bag.cpp:1015
rosbag::Bag::startWriting
void startWriting()
Definition: bag.cpp:286
rosbag::Bag::readMessageDefinitionRecord102
void readMessageDefinitionRecord102()
Definition: bag.cpp:741
rosbag::Bag::writeChunkHeader
void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size)
Definition: bag.cpp:492
rosbag::CONNECTION_COUNT_FIELD_NAME
static const std::string CONNECTION_COUNT_FIELD_NAME
Definition: constants.h:87
rosbag::ConnectionInfo::msg_def
std::string msg_def
Definition: structures.h:120
rosbag::compression::CompressionType
CompressionType
Definition: third-party/realsense-file/rosbag/rosbag_storage/include/rosbag/stream.h:116
test-projection-from-recording.filename
filename
Definition: test-projection-from-recording.py:15
kvp
Definition: parser.hpp:157
rosbag::Bag::writeIndexRecords
void writeIndexRecords()
Definition: bag.cpp:533
rosbag::Bag::curr_chunk_connection_indexes_
std::map< uint32_t, std::multiset< IndexEntry > > curr_chunk_connection_indexes_
Definition: bag.h:327
assert
#define assert(condition)
Definition: lz4.c:245
rosbag::Bag::readTopicIndexRecord102
void readTopicIndexRecord102()
Definition: bag.cpp:563
rs2rosinternal::TimeBase::sec
uint32_t sec
Definition: time.h:164
key
void key(GLFWwindow *window, int k, int s, int action, int mods)
Definition: gears.c:213
rosbag::ChunkInfo
Definition: structures.h:93
rosbag::Bag::chunk_open_
bool chunk_open_
Definition: bag.h:316
rosbag::VERSION
static const std::string VERSION
Definition: constants.h:76
rosbag::Bag::version_
int version_
Definition: bag.h:304
rosbag::Bag::startReadingVersion102
void startReadingVersion102()
Definition: bag.cpp:342
rosbag::Bag::openAppend
void openAppend(std::string const &filename)
Definition: bag.cpp:136
rosbag::Bag::openRead
void openRead(std::string const &filename)
Definition: bag.cpp:116
rs2rosinternal::TimeBase::nsec
uint32_t nsec
Definition: time.h:164
rosbag::compression::Uncompressed
@ Uncompressed
Definition: third-party/realsense-file/rosbag/rosbag_storage/include/rosbag/stream.h:150
rosbag::CHUNK_POS_FIELD_NAME
static const std::string CHUNK_POS_FIELD_NAME
Definition: constants.h:95
rosbag::CHUNK_COUNT_FIELD_NAME
static const std::string CHUNK_COUNT_FIELD_NAME
Definition: constants.h:88
view.h
rosbag::Bag::readFileHeaderRecord
void readFileHeaderRecord()
Definition: bag.cpp:410
rosbag::Bag::decompressRawChunk
void decompressRawChunk(ChunkHeader const &chunk_header) const
Definition: bag.cpp:839
rmse.e
e
Definition: rmse.py:177
rosbag::ConnectionInfo::id
uint32_t id
Definition: structures.h:116
rosbag::ChunkedFile::openRead
void openRead(std::string const &filename)
open file for reading
Definition: chunked_file.cpp:78
rosbag::TYPE_FIELD_NAME
static const std::string TYPE_FIELD_NAME
Definition: constants.h:99
rosbag
Definition: bag.h:61
rosbag::VER_FIELD_NAME
static const std::string VER_FIELD_NAME
Definition: constants.h:84
rosbag::COMPRESSION_LZ4
static const std::string COMPRESSION_LZ4
Definition: constants.h:129
rs2rosinternal::message_traits::md5sum
const char * md5sum()
returns MD5Sum<M>::value();
Definition: message_traits.h:205
rosbag::Bag::outgoing_chunk_buffer_
Buffer outgoing_chunk_buffer_
reusable buffer to read chunk into
Definition: bag.h:335
rosbag::Bag::readChunkHeader
void readChunkHeader(ChunkHeader &chunk_header) const
Definition: bag.cpp:515
rs2rosinternal::Header::getValues
M_stringPtr getValues()
Returns a shared pointer to the internal map used.
Definition: header.h:66
rosbag::COUNT_FIELD_NAME
static const std::string COUNT_FIELD_NAME
Definition: constants.h:85
rosbag::Bag::open
void open(std::string const &filename, uint32_t mode=bagmode::Read)
Open a bag file.
Definition: bag.cpp:97
end
GLuint GLuint end
Definition: glad/glad/glad.h:2395
rosbag::OP_MSG_DEF
static const unsigned char OP_MSG_DEF
Definition: constants.h:115
test-streaming.topic
topic
Definition: test-streaming.py:45
rosbag::Bag::chunk_count_
uint32_t chunk_count_
Definition: bag.h:313
rosbag::ChunkedFile::read
void read(void *ptr, size_t size)
read size bytes from the file into ptr
Definition: chunked_file.cpp:195
rosbag::Bag::compression_
CompressionType compression_
Definition: bag.h:305
rosbag::Bag::setChunkThreshold
void setChunkThreshold(uint32_t chunk_threshold)
Set the threshold for creating new chunks.
Definition: bag.cpp:188
rosbag::Bag::readVersion
void readVersion()
Definition: bag.cpp:262
rosbag::OP_FIELD_NAME
static const std::string OP_FIELD_NAME
Definition: constants.h:82
rosbag::ChunkedFile::write
void write(std::string const &s)
Definition: chunked_file.cpp:193
s
GLdouble s
Definition: glad/glad/glad.h:2441
rosbag::Bag::curr_chunk_data_pos_
uint64_t curr_chunk_data_pos_
Definition: bag.h:318
rosbag::BagUnindexedException
Exception thrown on problems reading the bag index.
Definition: exceptions.h:96
CONSOLE_BRIDGE_logError
#define CONSOLE_BRIDGE_logError(fmt,...)
Definition: console.h:78
rosbag::COMPRESSION_FIELD_NAME
static const std::string COMPRESSION_FIELD_NAME
Definition: constants.h:90
rosbag::SIZE_FIELD_NAME
static const std::string SIZE_FIELD_NAME
Definition: constants.h:91
rosbag::Bag::readConnectionIndexRecord200
void readConnectionIndexRecord200()
Definition: bag.cpp:625
rosbag::ChunkInfo::pos
uint64_t pos
latest timestamp of a message in the chunk
Definition: structures.h:97
rosbag::ChunkedFile::truncate
bool truncate(uint64_t length)
Definition: chunked_file.cpp:197
rosbag::Bag::decompress_buffer_
Buffer decompress_buffer_
reusable buffer to decompress chunks into
Definition: bag.h:333
rosbag::Bag::record_buffer_
Buffer record_buffer_
reusable buffer in which to assemble the record data before writing to file
Definition: bag.h:330
rosbag::Bag::writeVersion
void writeVersion()
Definition: bag.cpp:252
buf
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition: glad/glad/glad.h:3610
rosbag::OP_CONNECTION
static const unsigned char OP_CONNECTION
Definition: constants.h:112
rosbag::Bag::Bag
Bag()
Definition: bag.cpp:58
rosbag::COMPRESSION_BZ2
static const std::string COMPRESSION_BZ2
Definition: constants.h:128
rosbag::Bag::chunk_threshold_
uint32_t chunk_threshold_
Definition: bag.h:306
rosbag::OP_INDEX_DATA
static const unsigned char OP_INDEX_DATA
Definition: constants.h:109
rosbag::Bag::writeFileHeaderRecord
void writeFileHeaderRecord()
Definition: bag.cpp:378
rosbag::Bag::openWrite
void openWrite(std::string const &filename)
Definition: bag.cpp:130
rosbag::Bag::readConnectionRecord
void readConnectionRecord()
Definition: bag.cpp:705
rosbag::Bag::readMessageDataHeader
rs2rosinternal::Header readMessageDataHeader(IndexEntry const &index_entry)
Definition: bag.cpp:884
rosbag::ChunkHeader::compression
std::string compression
Definition: structures.h:104


librealsense2
Author(s): LibRealSense ROS Team
autogenerated on Fri Aug 2 2024 08:30:01