00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "rosbag/bag.h"
00029 #include "rosbag/message_instance.h"
00030 #include "rosbag/query.h"
00031 #include "rosbag/view.h"
00032
00033 #if defined(_MSC_VER)
00034 #include <stdint.h>
00035 #else
00036 #include <inttypes.h>
00037 #endif
00038 #include <signal.h>
00039 #include <assert.h>
00040 #include <iomanip>
00041
00042 #include <boost/foreach.hpp>
00043
00044 #include "console_bridge/console.h"
00045
00046 #define foreach BOOST_FOREACH
00047
00048 using std::map;
00049 using std::priority_queue;
00050 using std::string;
00051 using std::vector;
00052 using std::multiset;
00053 using boost::format;
00054 using boost::shared_ptr;
00055 using ros::M_string;
00056 using ros::Time;
00057
00058 namespace rosbag {
00059
00060 Bag::Bag() :
00061 mode_(bagmode::Write),
00062 version_(0),
00063 compression_(compression::Uncompressed),
00064 chunk_threshold_(768 * 1024),
00065 bag_revision_(0),
00066 file_size_(0),
00067 file_header_pos_(0),
00068 index_data_pos_(0),
00069 connection_count_(0),
00070 chunk_count_(0),
00071 chunk_open_(false),
00072 curr_chunk_data_pos_(0),
00073 current_buffer_(0),
00074 decompressed_chunk_(0)
00075 {
00076 }
00077
00078 Bag::Bag(string const& filename, uint32_t mode) :
00079 compression_(compression::Uncompressed),
00080 chunk_threshold_(768 * 1024),
00081 bag_revision_(0),
00082 file_size_(0),
00083 file_header_pos_(0),
00084 index_data_pos_(0),
00085 connection_count_(0),
00086 chunk_count_(0),
00087 chunk_open_(false),
00088 curr_chunk_data_pos_(0),
00089 current_buffer_(0),
00090 decompressed_chunk_(0)
00091 {
00092 open(filename, mode);
00093 }
00094
00095 Bag::~Bag() {
00096 close();
00097 }
00098
00099 void Bag::open(string const& filename, uint32_t mode) {
00100 mode_ = (BagMode) mode;
00101
00102 if (mode_ & bagmode::Append)
00103 openAppend(filename);
00104 else if (mode_ & bagmode::Write)
00105 openWrite(filename);
00106 else if (mode_ & bagmode::Read)
00107 openRead(filename);
00108 else
00109 throw BagException((format("Unknown mode: %1%") % (int) mode).str());
00110
00111
00112 uint64_t offset = file_.getOffset();
00113 seek(0, std::ios::end);
00114 file_size_ = file_.getOffset();
00115 seek(offset);
00116 }
00117
00118 void Bag::openRead(string const& filename) {
00119 file_.openRead(filename);
00120
00121 readVersion();
00122
00123 switch (version_) {
00124 case 102: startReadingVersion102(); break;
00125 case 200: startReadingVersion200(); break;
00126 default:
00127 throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str());
00128 }
00129 }
00130
00131 void Bag::openWrite(string const& filename) {
00132 file_.openWrite(filename);
00133
00134 startWriting();
00135 }
00136
00137 void Bag::openAppend(string const& filename) {
00138 file_.openReadWrite(filename);
00139
00140 readVersion();
00141
00142 if (version_ != 200)
00143 throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str());
00144
00145 startReadingVersion200();
00146
00147
00148 file_.truncate(index_data_pos_);
00149 index_data_pos_ = 0;
00150
00151
00152 seek(file_header_pos_);
00153 writeFileHeaderRecord();
00154
00155
00156 seek(0, std::ios::end);
00157 }
00158
00159 void Bag::close() {
00160 if (!file_.isOpen())
00161 return;
00162
00163 if (mode_ & bagmode::Write || mode_ & bagmode::Append)
00164 closeWrite();
00165
00166 file_.close();
00167
00168 topic_connection_ids_.clear();
00169 header_connection_ids_.clear();
00170 for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
00171 delete i->second;
00172 connections_.clear();
00173 chunks_.clear();
00174 connection_indexes_.clear();
00175 curr_chunk_connection_indexes_.clear();
00176 }
00177
00178 void Bag::closeWrite() {
00179 stopWriting();
00180 }
00181
00182 string Bag::getFileName() const { return file_.getFileName(); }
00183 BagMode Bag::getMode() const { return mode_; }
00184 uint64_t Bag::getSize() const { return file_size_; }
00185
00186 uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; }
00187
00188 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
00189 if (file_.isOpen() && chunk_open_)
00190 stopWritingChunk();
00191
00192 chunk_threshold_ = chunk_threshold;
00193 }
00194
00195 CompressionType Bag::getCompression() const { return compression_; }
00196
00197 void Bag::setCompression(CompressionType compression) {
00198 if (file_.isOpen() && chunk_open_)
00199 stopWritingChunk();
00200
00201 if (!(compression == compression::Uncompressed ||
00202 compression == compression::BZ2 ||
00203 compression == compression::LZ4)) {
00204 throw BagException(
00205 (format("Unknown compression type: %i") % compression).str());
00206 }
00207
00208 compression_ = compression;
00209 }
00210
00211
00212
00213 void Bag::writeVersion() {
00214 string version = string("#ROSBAG V") + VERSION + string("\n");
00215
00216 logDebug("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
00217
00218 version_ = 200;
00219
00220 write(version);
00221 }
00222
00223 void Bag::readVersion() {
00224 string version_line = file_.getline();
00225
00226 file_header_pos_ = file_.getOffset();
00227
00228 char logtypename[100];
00229 int version_major, version_minor;
00230 #if defined(_MSC_VER)
00231 if (sscanf_s(version_line.c_str(), "#ROS%s V%d.%d", logtypename, sizeof(logtypename), &version_major, &version_minor) != 3)
00232 #else
00233 if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3)
00234 #endif
00235 throw BagIOException("Error reading version line");
00236
00237 version_ = version_major * 100 + version_minor;
00238
00239 logDebug("Read VERSION: version=%d", version_);
00240 }
00241
00242 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
00243 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
00244
00245
00246
00247 void Bag::startWriting() {
00248 writeVersion();
00249 file_header_pos_ = file_.getOffset();
00250 writeFileHeaderRecord();
00251 }
00252
00253 void Bag::stopWriting() {
00254 if (chunk_open_)
00255 stopWritingChunk();
00256
00257 seek(0, std::ios::end);
00258
00259 index_data_pos_ = file_.getOffset();
00260 writeConnectionRecords();
00261 writeChunkInfoRecords();
00262
00263 seek(file_header_pos_);
00264 writeFileHeaderRecord();
00265 }
00266
00267 void Bag::startReadingVersion200() {
00268
00269 readFileHeaderRecord();
00270
00271
00272 seek(index_data_pos_);
00273
00274
00275 for (uint32_t i = 0; i < connection_count_; i++)
00276 readConnectionRecord();
00277
00278
00279 for (uint32_t i = 0; i < chunk_count_; i++)
00280 readChunkInfoRecord();
00281
00282
00283 foreach(ChunkInfo const& chunk_info, chunks_) {
00284 curr_chunk_info_ = chunk_info;
00285
00286 seek(curr_chunk_info_.pos);
00287
00288
00289 ChunkHeader chunk_header;
00290 readChunkHeader(chunk_header);
00291 seek(chunk_header.compressed_size, std::ios::cur);
00292
00293
00294 for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
00295 readConnectionIndexRecord200();
00296 }
00297
00298
00299 curr_chunk_info_ = ChunkInfo();
00300 }
00301
00302 void Bag::startReadingVersion102() {
00303 try
00304 {
00305
00306 readFileHeaderRecord();
00307 }
00308 catch (BagFormatException ex) {
00309 throw BagUnindexedException();
00310 }
00311
00312
00313 seek(0, std::ios::end);
00314 uint64_t filelength = file_.getOffset();
00315
00316
00317 seek(index_data_pos_);
00318
00319
00320 while (file_.getOffset() < filelength)
00321 readTopicIndexRecord102();
00322
00323
00324 for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
00325 multiset<IndexEntry> const& index = i->second;
00326 IndexEntry const& first_entry = *index.begin();
00327
00328 logDebug("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
00329
00330 seek(first_entry.chunk_pos);
00331
00332 readMessageDefinitionRecord102();
00333 }
00334 }
00335
00336
00337
00338 void Bag::writeFileHeaderRecord() {
00339 connection_count_ = connections_.size();
00340 chunk_count_ = chunks_.size();
00341
00342 logDebug("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
00343 (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00344
00345
00346 M_string header;
00347 header[OP_FIELD_NAME] = toHeaderString(&OP_FILE_HEADER);
00348 header[INDEX_POS_FIELD_NAME] = toHeaderString(&index_data_pos_);
00349 header[CONNECTION_COUNT_FIELD_NAME] = toHeaderString(&connection_count_);
00350 header[CHUNK_COUNT_FIELD_NAME] = toHeaderString(&chunk_count_);
00351
00352 boost::shared_array<uint8_t> header_buffer;
00353 uint32_t header_len;
00354 ros::Header::write(header, header_buffer, header_len);
00355 uint32_t data_len = 0;
00356 if (header_len < FILE_HEADER_LENGTH)
00357 data_len = FILE_HEADER_LENGTH - header_len;
00358 write((char*) &header_len, 4);
00359 write((char*) header_buffer.get(), header_len);
00360 write((char*) &data_len, 4);
00361
00362
00363 if (data_len > 0) {
00364 string padding;
00365 padding.resize(data_len, ' ');
00366 write(padding);
00367 }
00368 }
00369
00370 void Bag::readFileHeaderRecord() {
00371 ros::Header header;
00372 uint32_t data_size;
00373 if (!readHeader(header) || !readDataLength(data_size))
00374 throw BagFormatException("Error reading FILE_HEADER record");
00375
00376 M_string& fields = *header.getValues();
00377
00378 if (!isOp(fields, OP_FILE_HEADER))
00379 throw BagFormatException("Expected FILE_HEADER op not found");
00380
00381
00382 readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
00383
00384 if (index_data_pos_ == 0)
00385 throw BagUnindexedException();
00386
00387
00388 if (version_ >= 200) {
00389 readField(fields, CONNECTION_COUNT_FIELD_NAME, true, &connection_count_);
00390 readField(fields, CHUNK_COUNT_FIELD_NAME, true, &chunk_count_);
00391 }
00392
00393 logDebug("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
00394 (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00395
00396
00397 seek(data_size, std::ios::cur);
00398 }
00399
00400 uint32_t Bag::getChunkOffset() const {
00401 if (compression_ == compression::Uncompressed)
00402 return file_.getOffset() - curr_chunk_data_pos_;
00403 else
00404 return file_.getCompressedBytesIn();
00405 }
00406
00407 void Bag::startWritingChunk(Time time) {
00408
00409 curr_chunk_info_.pos = file_.getOffset();
00410 curr_chunk_info_.start_time = time;
00411 curr_chunk_info_.end_time = time;
00412
00413
00414 writeChunkHeader(compression_, 0, 0);
00415
00416
00417 file_.setWriteMode(compression_);
00418
00419
00420 curr_chunk_data_pos_ = file_.getOffset();
00421
00422 chunk_open_ = true;
00423 }
00424
00425 void Bag::stopWritingChunk() {
00426
00427 chunks_.push_back(curr_chunk_info_);
00428
00429
00430 uint32_t uncompressed_size = getChunkOffset();
00431 file_.setWriteMode(compression::Uncompressed);
00432 uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_;
00433
00434
00435 uint64_t end_of_chunk_pos = file_.getOffset();
00436
00437 seek(curr_chunk_info_.pos);
00438 writeChunkHeader(compression_, compressed_size, uncompressed_size);
00439
00440
00441 seek(end_of_chunk_pos);
00442 writeIndexRecords();
00443 curr_chunk_connection_indexes_.clear();
00444
00445
00446 curr_chunk_info_.connection_counts.clear();
00447
00448
00449 chunk_open_ = false;
00450 }
00451
00452 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
00453 ChunkHeader chunk_header;
00454 switch (compression) {
00455 case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
00456 case compression::BZ2: chunk_header.compression = COMPRESSION_BZ2; break;
00457 case compression::LZ4: chunk_header.compression = COMPRESSION_LZ4;
00458
00459 }
00460 chunk_header.compressed_size = compressed_size;
00461 chunk_header.uncompressed_size = uncompressed_size;
00462
00463 logDebug("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
00464 (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
00465
00466 M_string header;
00467 header[OP_FIELD_NAME] = toHeaderString(&OP_CHUNK);
00468 header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
00469 header[SIZE_FIELD_NAME] = toHeaderString(&chunk_header.uncompressed_size);
00470 writeHeader(header);
00471
00472 writeDataLength(chunk_header.compressed_size);
00473 }
00474
00475 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
00476 ros::Header header;
00477 if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
00478 throw BagFormatException("Error reading CHUNK record");
00479
00480 M_string& fields = *header.getValues();
00481
00482 if (!isOp(fields, OP_CHUNK))
00483 throw BagFormatException("Expected CHUNK op not found");
00484
00485 readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
00486 readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size);
00487
00488 logDebug("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
00489 }
00490
00491
00492
00493 void Bag::writeIndexRecords() {
00494 for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
00495 uint32_t connection_id = i->first;
00496 multiset<IndexEntry> const& index = i->second;
00497
00498
00499 uint32_t index_size = index.size();
00500 M_string header;
00501 header[OP_FIELD_NAME] = toHeaderString(&OP_INDEX_DATA);
00502 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
00503 header[VER_FIELD_NAME] = toHeaderString(&INDEX_VERSION);
00504 header[COUNT_FIELD_NAME] = toHeaderString(&index_size);
00505 writeHeader(header);
00506
00507 writeDataLength(index_size * 12);
00508
00509 logDebug("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
00510
00511
00512 foreach(IndexEntry const& e, index) {
00513 write((char*) &e.time.sec, 4);
00514 write((char*) &e.time.nsec, 4);
00515 write((char*) &e.offset, 4);
00516
00517 logDebug(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
00518 }
00519 }
00520 }
00521
00522 void Bag::readTopicIndexRecord102() {
00523 ros::Header header;
00524 uint32_t data_size;
00525 if (!readHeader(header) || !readDataLength(data_size))
00526 throw BagFormatException("Error reading INDEX_DATA header");
00527 M_string& fields = *header.getValues();
00528
00529 if (!isOp(fields, OP_INDEX_DATA))
00530 throw BagFormatException("Expected INDEX_DATA record");
00531
00532 uint32_t index_version;
00533 string topic;
00534 uint32_t count = 0;
00535 readField(fields, VER_FIELD_NAME, true, &index_version);
00536 readField(fields, TOPIC_FIELD_NAME, true, topic);
00537 readField(fields, COUNT_FIELD_NAME, true, &count);
00538
00539 logDebug("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
00540
00541 if (index_version != 0)
00542 throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00543
00544 uint32_t connection_id;
00545 map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00546 if (topic_conn_id_iter == topic_connection_ids_.end()) {
00547 connection_id = connections_.size();
00548
00549 logDebug("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
00550 ConnectionInfo* connection_info = new ConnectionInfo();
00551 connection_info->id = connection_id;
00552 connection_info->topic = topic;
00553 connections_[connection_id] = connection_info;
00554
00555 topic_connection_ids_[topic] = connection_id;
00556 }
00557 else
00558 connection_id = topic_conn_id_iter->second;
00559
00560 multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00561
00562 for (uint32_t i = 0; i < count; i++) {
00563 IndexEntry index_entry;
00564 uint32_t sec;
00565 uint32_t nsec;
00566 read((char*) &sec, 4);
00567 read((char*) &nsec, 4);
00568 read((char*) &index_entry.chunk_pos, 8);
00569 index_entry.time = Time(sec, nsec);
00570 index_entry.offset = 0;
00571
00572 logDebug(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
00573
00574 if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00575 {
00576 logError("Index entry for topic %s contains invalid time.", topic.c_str());
00577 } else
00578 {
00579 connection_index.insert(connection_index.end(), index_entry);
00580 }
00581 }
00582 }
00583
00584 void Bag::readConnectionIndexRecord200() {
00585 ros::Header header;
00586 uint32_t data_size;
00587 if (!readHeader(header) || !readDataLength(data_size))
00588 throw BagFormatException("Error reading INDEX_DATA header");
00589 M_string& fields = *header.getValues();
00590
00591 if (!isOp(fields, OP_INDEX_DATA))
00592 throw BagFormatException("Expected INDEX_DATA record");
00593
00594 uint32_t index_version;
00595 uint32_t connection_id;
00596 uint32_t count = 0;
00597 readField(fields, VER_FIELD_NAME, true, &index_version);
00598 readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
00599 readField(fields, COUNT_FIELD_NAME, true, &count);
00600
00601 logDebug("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
00602
00603 if (index_version != 1)
00604 throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00605
00606 uint64_t chunk_pos = curr_chunk_info_.pos;
00607
00608 multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00609
00610 for (uint32_t i = 0; i < count; i++) {
00611 IndexEntry index_entry;
00612 index_entry.chunk_pos = chunk_pos;
00613 uint32_t sec;
00614 uint32_t nsec;
00615 read((char*) &sec, 4);
00616 read((char*) &nsec, 4);
00617 read((char*) &index_entry.offset, 4);
00618 index_entry.time = Time(sec, nsec);
00619
00620 logDebug(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
00621
00622 if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00623 {
00624 logError("Index entry for topic %s contains invalid time. This message will not be loaded.", connections_[connection_id]->topic.c_str());
00625 } else
00626 {
00627 connection_index.insert(connection_index.end(), index_entry);
00628 }
00629 }
00630 }
00631
00632
00633
00634 void Bag::writeConnectionRecords() {
00635 for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
00636 ConnectionInfo const* connection_info = i->second;
00637 writeConnectionRecord(connection_info);
00638 }
00639 }
00640
00641 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) {
00642 logDebug("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
00643 (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
00644
00645 M_string header;
00646 header[OP_FIELD_NAME] = toHeaderString(&OP_CONNECTION);
00647 header[TOPIC_FIELD_NAME] = connection_info->topic;
00648 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00649 writeHeader(header);
00650
00651 writeHeader(*connection_info->header);
00652 }
00653
00654 void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) {
00655 M_string header;
00656 header[OP_FIELD_NAME] = toHeaderString(&OP_CONNECTION);
00657 header[TOPIC_FIELD_NAME] = connection_info->topic;
00658 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00659 appendHeaderToBuffer(buf, header);
00660
00661 appendHeaderToBuffer(buf, *connection_info->header);
00662 }
00663
00664 void Bag::readConnectionRecord() {
00665 ros::Header header;
00666 if (!readHeader(header))
00667 throw BagFormatException("Error reading CONNECTION header");
00668 M_string& fields = *header.getValues();
00669
00670 if (!isOp(fields, OP_CONNECTION))
00671 throw BagFormatException("Expected CONNECTION op not found");
00672
00673 uint32_t id;
00674 readField(fields, CONNECTION_FIELD_NAME, true, &id);
00675 string topic;
00676 readField(fields, TOPIC_FIELD_NAME, true, topic);
00677
00678 ros::Header connection_header;
00679 if (!readHeader(connection_header))
00680 throw BagFormatException("Error reading connection header");
00681
00682
00683 map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
00684 if (key == connections_.end()) {
00685 ConnectionInfo* connection_info = new ConnectionInfo();
00686 connection_info->id = id;
00687 connection_info->topic = topic;
00688 connection_info->header = boost::make_shared<M_string>();
00689 for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
00690 (*connection_info->header)[i->first] = i->second;
00691 connection_info->msg_def = (*connection_info->header)["message_definition"];
00692 connection_info->datatype = (*connection_info->header)["type"];
00693 connection_info->md5sum = (*connection_info->header)["md5sum"];
00694 connections_[id] = connection_info;
00695
00696 logDebug("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
00697 }
00698 }
00699
00700 void Bag::readMessageDefinitionRecord102() {
00701 ros::Header header;
00702 uint32_t data_size;
00703 if (!readHeader(header) || !readDataLength(data_size))
00704 throw BagFormatException("Error reading message definition header");
00705 M_string& fields = *header.getValues();
00706
00707 if (!isOp(fields, OP_MSG_DEF))
00708 throw BagFormatException("Expected MSG_DEF op not found");
00709
00710 string topic, md5sum, datatype, message_definition;
00711 readField(fields, TOPIC_FIELD_NAME, true, topic);
00712 readField(fields, MD5_FIELD_NAME, 32, 32, true, md5sum);
00713 readField(fields, TYPE_FIELD_NAME, true, datatype);
00714 readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition);
00715
00716 ConnectionInfo* connection_info;
00717
00718 map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00719 if (topic_conn_id_iter == topic_connection_ids_.end()) {
00720 uint32_t id = connections_.size();
00721
00722 logDebug("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00723 connection_info = new ConnectionInfo();
00724 connection_info->id = id;
00725 connection_info->topic = topic;
00726
00727 connections_[id] = connection_info;
00728 topic_connection_ids_[topic] = id;
00729 }
00730 else
00731 connection_info = connections_[topic_conn_id_iter->second];
00732
00733 connection_info->msg_def = message_definition;
00734 connection_info->datatype = datatype;
00735 connection_info->md5sum = md5sum;
00736 connection_info->header = boost::make_shared<ros::M_string>();
00737 (*connection_info->header)["type"] = connection_info->datatype;
00738 (*connection_info->header)["md5sum"] = connection_info->md5sum;
00739 (*connection_info->header)["message_definition"] = connection_info->msg_def;
00740
00741 logDebug("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00742 }
00743
00744 void Bag::decompressChunk(uint64_t chunk_pos) const {
00745 if (curr_chunk_info_.pos == chunk_pos) {
00746 current_buffer_ = &outgoing_chunk_buffer_;
00747 return;
00748 }
00749
00750 current_buffer_ = &decompress_buffer_;
00751
00752 if (decompressed_chunk_ == chunk_pos)
00753 return;
00754
00755
00756 seek(chunk_pos);
00757
00758
00759 ChunkHeader chunk_header;
00760 readChunkHeader(chunk_header);
00761
00762
00763 if (chunk_header.compression == COMPRESSION_NONE)
00764 decompressRawChunk(chunk_header);
00765 else if (chunk_header.compression == COMPRESSION_BZ2)
00766 decompressBz2Chunk(chunk_header);
00767 else if (chunk_header.compression == COMPRESSION_LZ4)
00768 decompressLz4Chunk(chunk_header);
00769 else
00770 throw BagFormatException("Unknown compression: " + chunk_header.compression);
00771
00772 decompressed_chunk_ = chunk_pos;
00773 }
00774
00775 void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const {
00776 logDebug("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
00777
00778 seek(offset);
00779
00780 uint32_t data_size;
00781 uint8_t op;
00782 do {
00783 if (!readHeader(header) || !readDataLength(data_size))
00784 throw BagFormatException("Error reading header");
00785
00786 readField(*header.getValues(), OP_FIELD_NAME, true, &op);
00787 }
00788 while (op == OP_MSG_DEF);
00789
00790 if (op != OP_MSG_DATA)
00791 throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str());
00792
00793 record_buffer_.setSize(data_size);
00794 file_.read((char*) record_buffer_.getData(), data_size);
00795 }
00796
00797
00798 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
00799 assert(chunk_header.compression == COMPRESSION_NONE);
00800 assert(chunk_header.compressed_size == chunk_header.uncompressed_size);
00801
00802 logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00803
00804 decompress_buffer_.setSize(chunk_header.compressed_size);
00805 file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size);
00806
00807
00808 }
00809
00810 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
00811 assert(chunk_header.compression == COMPRESSION_BZ2);
00812
00813 CompressionType compression = compression::BZ2;
00814
00815 logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00816
00817 chunk_buffer_.setSize(chunk_header.compressed_size);
00818 file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
00819
00820 decompress_buffer_.setSize(chunk_header.uncompressed_size);
00821 file_.decompress(compression, decompress_buffer_.getData(), decompress_buffer_.getSize(), chunk_buffer_.getData(), chunk_buffer_.getSize());
00822
00823
00824 }
00825
00826 void Bag::decompressLz4Chunk(ChunkHeader const& chunk_header) const {
00827 assert(chunk_header.compression == COMPRESSION_LZ4);
00828
00829 CompressionType compression = compression::LZ4;
00830
00831 logDebug("lz4 compressed_size: %d uncompressed_size: %d",
00832 chunk_header.compressed_size, chunk_header.uncompressed_size);
00833
00834 chunk_buffer_.setSize(chunk_header.compressed_size);
00835 file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
00836
00837 decompress_buffer_.setSize(chunk_header.uncompressed_size);
00838 file_.decompress(compression, decompress_buffer_.getData(), decompress_buffer_.getSize(), chunk_buffer_.getData(), chunk_buffer_.getSize());
00839
00840
00841 }
00842
00843 ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) {
00844 ros::Header header;
00845 uint32_t data_size;
00846 uint32_t bytes_read;
00847 switch (version_)
00848 {
00849 case 200:
00850 decompressChunk(index_entry.chunk_pos);
00851 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00852 return header;
00853 case 102:
00854 readMessageDataRecord102(index_entry.chunk_pos, header);
00855 return header;
00856 default:
00857 throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00858 }
00859 }
00860
00861
00862 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
00863 ros::Header header;
00864 uint32_t data_size;
00865 uint32_t bytes_read;
00866 switch (version_)
00867 {
00868 case 200:
00869 decompressChunk(index_entry.chunk_pos);
00870 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00871 return data_size;
00872 case 102:
00873 readMessageDataRecord102(index_entry.chunk_pos, header);
00874 return record_buffer_.getSize();
00875 default:
00876 throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00877 }
00878 }
00879
00880 void Bag::writeChunkInfoRecords() {
00881 foreach(ChunkInfo const& chunk_info, chunks_) {
00882
00883 M_string header;
00884 uint32_t chunk_connection_count = chunk_info.connection_counts.size();
00885 header[OP_FIELD_NAME] = toHeaderString(&OP_CHUNK_INFO);
00886 header[VER_FIELD_NAME] = toHeaderString(&CHUNK_INFO_VERSION);
00887 header[CHUNK_POS_FIELD_NAME] = toHeaderString(&chunk_info.pos);
00888 header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
00889 header[END_TIME_FIELD_NAME] = toHeaderString(&chunk_info.end_time);
00890 header[COUNT_FIELD_NAME] = toHeaderString(&chunk_connection_count);
00891
00892 logDebug("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
00893 (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
00894 chunk_info.start_time.sec, chunk_info.start_time.nsec,
00895 chunk_info.end_time.sec, chunk_info.end_time.nsec);
00896
00897 writeHeader(header);
00898
00899 writeDataLength(8 * chunk_connection_count);
00900
00901
00902 for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
00903 uint32_t connection_id = i->first;
00904 uint32_t count = i->second;
00905
00906 write((char*) &connection_id, 4);
00907 write((char*) &count, 4);
00908
00909 logDebug(" - %d: %d", connection_id, count);
00910 }
00911 }
00912 }
00913
00914 void Bag::readChunkInfoRecord() {
00915
00916 ros::Header header;
00917 uint32_t data_size;
00918 if (!readHeader(header) || !readDataLength(data_size))
00919 throw BagFormatException("Error reading CHUNK_INFO record header");
00920 M_string& fields = *header.getValues();
00921 if (!isOp(fields, OP_CHUNK_INFO))
00922 throw BagFormatException("Expected CHUNK_INFO op not found");
00923
00924
00925 uint32_t chunk_info_version;
00926 readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
00927 if (chunk_info_version != CHUNK_INFO_VERSION)
00928 throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str());
00929
00930
00931 ChunkInfo chunk_info;
00932 readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos);
00933 readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time);
00934 readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time);
00935 uint32_t chunk_connection_count = 0;
00936 readField(fields, COUNT_FIELD_NAME, true, &chunk_connection_count);
00937
00938 logDebug("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
00939 (unsigned long long) chunk_info.pos, chunk_connection_count,
00940 chunk_info.start_time.sec, chunk_info.start_time.nsec,
00941 chunk_info.end_time.sec, chunk_info.end_time.nsec);
00942
00943
00944 for (uint32_t i = 0; i < chunk_connection_count; i ++) {
00945 uint32_t connection_id, connection_count;
00946 read((char*) &connection_id, 4);
00947 read((char*) &connection_count, 4);
00948
00949 logDebug(" %d: %d messages", connection_id, connection_count);
00950
00951 chunk_info.connection_counts[connection_id] = connection_count;
00952 }
00953
00954 chunks_.push_back(chunk_info);
00955 }
00956
00957
00958
00959 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
00960 uint8_t op = 0xFF;
00961 readField(fields, OP_FIELD_NAME, true, &op);
00962 return op == reqOp;
00963 }
00964
00965 void Bag::writeHeader(M_string const& fields) {
00966 boost::shared_array<uint8_t> header_buffer;
00967 uint32_t header_len;
00968 ros::Header::write(fields, header_buffer, header_len);
00969 write((char*) &header_len, 4);
00970 write((char*) header_buffer.get(), header_len);
00971 }
00972
00973 void Bag::writeDataLength(uint32_t data_len) {
00974 write((char*) &data_len, 4);
00975 }
00976
00977 void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) {
00978 boost::shared_array<uint8_t> header_buffer;
00979 uint32_t header_len;
00980 ros::Header::write(fields, header_buffer, header_len);
00981
00982 uint32_t offset = buf.getSize();
00983
00984 buf.setSize(buf.getSize() + 4 + header_len);
00985
00986 memcpy(buf.getData() + offset, &header_len, 4);
00987 offset += 4;
00988 memcpy(buf.getData() + offset, header_buffer.get(), header_len);
00989 }
00990
00991 void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) {
00992 uint32_t offset = buf.getSize();
00993
00994 buf.setSize(buf.getSize() + 4);
00995
00996 memcpy(buf.getData() + offset, &data_len, 4);
00997 }
00998
01000 void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const {
01001 assert(buffer.getSize() > 8);
01002
01003 uint8_t* start = (uint8_t*) buffer.getData() + offset;
01004
01005 uint8_t* ptr = start;
01006
01007
01008 uint32_t header_len;
01009 memcpy(&header_len, ptr, 4);
01010 ptr += 4;
01011
01012
01013 string error_msg;
01014 bool parsed = header.parse(ptr, header_len, error_msg);
01015 if (!parsed)
01016 throw BagFormatException("Error parsing header");
01017 ptr += header_len;
01018
01019
01020 memcpy(&data_size, ptr, 4);
01021 ptr += 4;
01022
01023 bytes_read = ptr - start;
01024 }
01025
01026 void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) const {
01027 (void)buffer;
01028 total_bytes_read = 0;
01029 uint8_t op = 0xFF;
01030 do {
01031 logDebug("reading header from buffer: offset=%d", offset);
01032 uint32_t bytes_read;
01033 readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
01034
01035 offset += bytes_read;
01036 total_bytes_read += bytes_read;
01037
01038 readField(*header.getValues(), OP_FIELD_NAME, true, &op);
01039 }
01040 while (op == OP_MSG_DEF || op == OP_CONNECTION);
01041
01042 if (op != OP_MSG_DATA)
01043 throw BagFormatException("Expected MSG_DATA op not found");
01044 }
01045
01046 bool Bag::readHeader(ros::Header& header) const {
01047
01048 uint32_t header_len;
01049 read((char*) &header_len, 4);
01050
01051
01052 header_buffer_.setSize(header_len);
01053 read((char*) header_buffer_.getData(), header_len);
01054
01055
01056 string error_msg;
01057 bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
01058 if (!parsed)
01059 return false;
01060
01061 return true;
01062 }
01063
01064 bool Bag::readDataLength(uint32_t& data_size) const {
01065 read((char*) &data_size, 4);
01066 return true;
01067 }
01068
01069 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
01070 M_string::const_iterator fitr = fields.find(field);
01071 if (fitr == fields.end()) {
01072 if (required)
01073 throw BagFormatException("Required '" + field + "' field missing");
01074 }
01075 else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
01076 throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
01077
01078 return fitr;
01079 }
01080
01081 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
01082 return readField(fields, field_name, 1, UINT_MAX, required, data);
01083 }
01084
01085 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
01086 M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
01087 if (fitr == fields.end())
01088 return false;
01089
01090 data = fitr->second;
01091 return true;
01092 }
01093
01094 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
01095 uint64_t packed_time;
01096 if (!readField(fields, field_name, required, &packed_time))
01097 return false;
01098
01099 uint64_t bitmask = (1LL << 33) - 1;
01100 data.sec = (uint32_t) (packed_time & bitmask);
01101 data.nsec = (uint32_t) (packed_time >> 32);
01102
01103 return true;
01104 }
01105
01106 std::string Bag::toHeaderString(Time const* field) const {
01107 uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
01108 return toHeaderString(&packed_time);
01109 }
01110
01111
01112
01113
01114 void Bag::write(string const& s) { write(s.c_str(), s.length()); }
01115 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n); }
01116
01117 void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); }
01118 void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); }
01119
01120 }