00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "rosbag/bag.h"
00029 #include "rosbag/message_instance.h"
00030 #include "rosbag/query.h"
00031 #include "rosbag/view.h"
00032
00033 #if defined(_MSC_VER)
00034 #include <stdint.h>
00035 #else
00036 #include <inttypes.h>
00037 #endif
00038 #include <signal.h>
00039
00040 #include <iomanip>
00041
00042 #include <boost/foreach.hpp>
00043
00044 #define foreach BOOST_FOREACH
00045
00046 using std::map;
00047 using std::priority_queue;
00048 using std::string;
00049 using std::vector;
00050 using std::multiset;
00051 using boost::format;
00052 using boost::shared_ptr;
00053 using ros::M_string;
00054 using ros::Time;
00055
00056 namespace rosbag {
00057
00058 Bag::Bag() :
00059 compression_(compression::Uncompressed),
00060 chunk_threshold_(768 * 1024),
00061 bag_revision_(0),
00062 file_size_(0),
00063 file_header_pos_(0),
00064 index_data_pos_(0),
00065 connection_count_(0),
00066 chunk_count_(0),
00067 chunk_open_(false),
00068 curr_chunk_data_pos_(0),
00069 decompressed_chunk_(0)
00070 {
00071 }
00072
00073 Bag::Bag(string const& filename, uint32_t mode) :
00074 compression_(compression::Uncompressed),
00075 chunk_threshold_(768 * 1024),
00076 bag_revision_(0),
00077 file_size_(0),
00078 file_header_pos_(0),
00079 index_data_pos_(0),
00080 connection_count_(0),
00081 chunk_count_(0),
00082 chunk_open_(false),
00083 curr_chunk_data_pos_(0),
00084 decompressed_chunk_(0)
00085 {
00086 open(filename, mode);
00087 }
00088
00089 Bag::~Bag() {
00090 close();
00091 }
00092
00093 void Bag::open(string const& filename, uint32_t mode) {
00094 mode_ = (BagMode) mode;
00095
00096 if (mode_ & bagmode::Append)
00097 openAppend(filename);
00098 else if (mode_ & bagmode::Write)
00099 openWrite(filename);
00100 else if (mode_ & bagmode::Read)
00101 openRead(filename);
00102 else
00103 throw BagException((format("Unknown mode: %1%") % (int) mode).str());
00104
00105
00106 uint64_t offset = file_.getOffset();
00107 seek(0, std::ios::end);
00108 file_size_ = file_.getOffset();
00109 seek(offset);
00110 }
00111
00112 void Bag::openRead(string const& filename) {
00113 file_.openRead(filename);
00114
00115 readVersion();
00116
00117 switch (version_) {
00118 case 102: startReadingVersion102(); break;
00119 case 200: startReadingVersion200(); break;
00120 default:
00121 throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str());
00122 }
00123 }
00124
00125 void Bag::openWrite(string const& filename) {
00126 file_.openWrite(filename);
00127
00128 startWriting();
00129 }
00130
00131 void Bag::openAppend(string const& filename) {
00132 file_.openReadWrite(filename);
00133
00134 readVersion();
00135
00136 if (version_ != 200)
00137 throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str());
00138
00139 startReadingVersion200();
00140
00141
00142 file_.truncate(index_data_pos_);
00143 index_data_pos_ = 0;
00144
00145
00146 seek(file_header_pos_);
00147 writeFileHeaderRecord();
00148
00149
00150 seek(0, std::ios::end);
00151 }
00152
00153 void Bag::close() {
00154 if (!file_.isOpen())
00155 return;
00156
00157 if (mode_ & bagmode::Write || mode_ & bagmode::Append)
00158 closeWrite();
00159
00160 file_.close();
00161
00162 topic_connection_ids_.clear();
00163 header_connection_ids_.clear();
00164 for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
00165 delete i->second;
00166 connections_.clear();
00167 chunks_.clear();
00168 connection_indexes_.clear();
00169 curr_chunk_connection_indexes_.clear();
00170 }
00171
00172 void Bag::closeWrite() {
00173 stopWriting();
00174 }
00175
00176 string Bag::getFileName() const { return file_.getFileName(); }
00177 BagMode Bag::getMode() const { return mode_; }
00178 uint64_t Bag::getSize() const { return file_size_; }
00179
00180 uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; }
00181
00182 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
00183 if (file_.isOpen() && chunk_open_)
00184 stopWritingChunk();
00185
00186 chunk_threshold_ = chunk_threshold;
00187 }
00188
00189 CompressionType Bag::getCompression() const { return compression_; }
00190
00191 void Bag::setCompression(CompressionType compression) {
00192 if (file_.isOpen() && chunk_open_)
00193 stopWritingChunk();
00194
00195 compression_ = compression;
00196 }
00197
00198
00199
00200 void Bag::writeVersion() {
00201 string version = string("#ROSBAG V") + VERSION + string("\n");
00202
00203 ROS_DEBUG("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
00204
00205 version_ = 200;
00206
00207 write(version);
00208 }
00209
00210 void Bag::readVersion() {
00211 string version_line = file_.getline();
00212
00213 file_header_pos_ = file_.getOffset();
00214
00215 char logtypename[100];
00216 int version_major, version_minor;
00217 #if defined(_MSC_VER)
00218 if (sscanf_s(version_line.c_str(), "#ROS%s V%d.%d", logtypename, sizeof(logtypename), &version_major, &version_minor) != 3)
00219 #else
00220 if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3)
00221 #endif
00222 throw BagIOException("Error reading version line");
00223
00224 version_ = version_major * 100 + version_minor;
00225
00226 ROS_DEBUG("Read VERSION: version=%d", version_);
00227 }
00228
00229 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
00230 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
00231
00232
00233
00234 void Bag::startWriting() {
00235 writeVersion();
00236 file_header_pos_ = file_.getOffset();
00237 writeFileHeaderRecord();
00238 }
00239
00240 void Bag::stopWriting() {
00241 if (chunk_open_)
00242 stopWritingChunk();
00243
00244 seek(0, std::ios::end);
00245
00246 index_data_pos_ = file_.getOffset();
00247 writeConnectionRecords();
00248 writeChunkInfoRecords();
00249
00250 seek(file_header_pos_);
00251 writeFileHeaderRecord();
00252 }
00253
00254 void Bag::startReadingVersion200() {
00255
00256 readFileHeaderRecord();
00257
00258
00259 seek(index_data_pos_);
00260
00261
00262 for (uint32_t i = 0; i < connection_count_; i++)
00263 readConnectionRecord();
00264
00265
00266 for (uint32_t i = 0; i < chunk_count_; i++)
00267 readChunkInfoRecord();
00268
00269
00270 foreach(ChunkInfo const& chunk_info, chunks_) {
00271 curr_chunk_info_ = chunk_info;
00272
00273 seek(curr_chunk_info_.pos);
00274
00275
00276 ChunkHeader chunk_header;
00277 readChunkHeader(chunk_header);
00278 seek(chunk_header.compressed_size, std::ios::cur);
00279
00280
00281 for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
00282 readConnectionIndexRecord200();
00283 }
00284
00285
00286 curr_chunk_info_ = ChunkInfo();
00287 }
00288
00289 void Bag::startReadingVersion102() {
00290 try
00291 {
00292
00293 readFileHeaderRecord();
00294 }
00295 catch (BagFormatException ex) {
00296 throw BagUnindexedException();
00297 }
00298
00299
00300 seek(0, std::ios::end);
00301 uint64_t filelength = file_.getOffset();
00302
00303
00304 seek(index_data_pos_);
00305
00306
00307 while (file_.getOffset() < filelength)
00308 readTopicIndexRecord102();
00309
00310
00311 for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
00312 multiset<IndexEntry> const& index = i->second;
00313 IndexEntry const& first_entry = *index.begin();
00314
00315 ROS_DEBUG("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
00316
00317 seek(first_entry.chunk_pos);
00318
00319 readMessageDefinitionRecord102();
00320 }
00321 }
00322
00323
00324
00325 void Bag::writeFileHeaderRecord() {
00326 connection_count_ = connections_.size();
00327 chunk_count_ = chunks_.size();
00328
00329 ROS_DEBUG("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
00330 (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00331
00332
00333 M_string header;
00334 header[OP_FIELD_NAME] = toHeaderString(&OP_FILE_HEADER);
00335 header[INDEX_POS_FIELD_NAME] = toHeaderString(&index_data_pos_);
00336 header[CONNECTION_COUNT_FIELD_NAME] = toHeaderString(&connection_count_);
00337 header[CHUNK_COUNT_FIELD_NAME] = toHeaderString(&chunk_count_);
00338
00339 boost::shared_array<uint8_t> header_buffer;
00340 uint32_t header_len;
00341 ros::Header::write(header, header_buffer, header_len);
00342 uint32_t data_len = 0;
00343 if (header_len < FILE_HEADER_LENGTH)
00344 data_len = FILE_HEADER_LENGTH - header_len;
00345 write((char*) &header_len, 4);
00346 write((char*) header_buffer.get(), header_len);
00347 write((char*) &data_len, 4);
00348
00349
00350 if (data_len > 0) {
00351 string padding;
00352 padding.resize(data_len, ' ');
00353 write(padding);
00354 }
00355 }
00356
00357 void Bag::readFileHeaderRecord() {
00358 ros::Header header;
00359 uint32_t data_size;
00360 if (!readHeader(header) || !readDataLength(data_size))
00361 throw BagFormatException("Error reading FILE_HEADER record");
00362
00363 M_string& fields = *header.getValues();
00364
00365 if (!isOp(fields, OP_FILE_HEADER))
00366 throw BagFormatException("Expected FILE_HEADER op not found");
00367
00368
00369 readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
00370
00371 if (index_data_pos_ == 0)
00372 throw BagUnindexedException();
00373
00374
00375 if (version_ >= 200) {
00376 readField(fields, CONNECTION_COUNT_FIELD_NAME, true, &connection_count_);
00377 readField(fields, CHUNK_COUNT_FIELD_NAME, true, &chunk_count_);
00378 }
00379
00380 ROS_DEBUG("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
00381 (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00382
00383
00384 seek(data_size, std::ios::cur);
00385 }
00386
00387 uint32_t Bag::getChunkOffset() const {
00388 if (compression_ == compression::Uncompressed)
00389 return file_.getOffset() - curr_chunk_data_pos_;
00390 else
00391 return file_.getCompressedBytesIn();
00392 }
00393
00394 void Bag::startWritingChunk(Time time) {
00395
00396 curr_chunk_info_.pos = file_.getOffset();
00397 curr_chunk_info_.start_time = time;
00398 curr_chunk_info_.end_time = time;
00399
00400
00401 writeChunkHeader(compression_, 0, 0);
00402
00403
00404 file_.setWriteMode(compression_);
00405
00406
00407 curr_chunk_data_pos_ = file_.getOffset();
00408
00409 chunk_open_ = true;
00410 }
00411
00412 void Bag::stopWritingChunk() {
00413
00414 chunks_.push_back(curr_chunk_info_);
00415
00416
00417 uint32_t uncompressed_size = getChunkOffset();
00418 file_.setWriteMode(compression::Uncompressed);
00419 uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_;
00420
00421
00422 uint64_t end_of_chunk_pos = file_.getOffset();
00423
00424 seek(curr_chunk_info_.pos);
00425 writeChunkHeader(compression_, compressed_size, uncompressed_size);
00426
00427
00428 seek(end_of_chunk_pos);
00429 writeIndexRecords();
00430 curr_chunk_connection_indexes_.clear();
00431
00432
00433 curr_chunk_info_.connection_counts.clear();
00434
00435
00436 chunk_open_ = false;
00437 }
00438
00439 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
00440 ChunkHeader chunk_header;
00441 switch (compression) {
00442 case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
00443 case compression::BZ2: chunk_header.compression = COMPRESSION_BZ2; break;
00444
00445 }
00446 chunk_header.compressed_size = compressed_size;
00447 chunk_header.uncompressed_size = uncompressed_size;
00448
00449 ROS_DEBUG("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
00450 (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
00451
00452 M_string header;
00453 header[OP_FIELD_NAME] = toHeaderString(&OP_CHUNK);
00454 header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
00455 header[SIZE_FIELD_NAME] = toHeaderString(&chunk_header.uncompressed_size);
00456 writeHeader(header);
00457
00458 writeDataLength(chunk_header.compressed_size);
00459 }
00460
00461 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
00462 ros::Header header;
00463 if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
00464 throw BagFormatException("Error reading CHUNK record");
00465
00466 M_string& fields = *header.getValues();
00467
00468 if (!isOp(fields, OP_CHUNK))
00469 throw BagFormatException("Expected CHUNK op not found");
00470
00471 readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
00472 readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size);
00473
00474 ROS_DEBUG("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
00475 }
00476
00477
00478
00479 void Bag::writeIndexRecords() {
00480 for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
00481 uint32_t connection_id = i->first;
00482 multiset<IndexEntry> const& index = i->second;
00483
00484
00485 uint32_t index_size = index.size();
00486 M_string header;
00487 header[OP_FIELD_NAME] = toHeaderString(&OP_INDEX_DATA);
00488 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
00489 header[VER_FIELD_NAME] = toHeaderString(&INDEX_VERSION);
00490 header[COUNT_FIELD_NAME] = toHeaderString(&index_size);
00491 writeHeader(header);
00492
00493 writeDataLength(index_size * 12);
00494
00495 ROS_DEBUG("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
00496
00497
00498 foreach(IndexEntry const& e, index) {
00499 write((char*) &e.time.sec, 4);
00500 write((char*) &e.time.nsec, 4);
00501 write((char*) &e.offset, 4);
00502
00503 ROS_DEBUG(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
00504 }
00505 }
00506 }
00507
00508 void Bag::readTopicIndexRecord102() {
00509 ros::Header header;
00510 uint32_t data_size;
00511 if (!readHeader(header) || !readDataLength(data_size))
00512 throw BagFormatException("Error reading INDEX_DATA header");
00513 M_string& fields = *header.getValues();
00514
00515 if (!isOp(fields, OP_INDEX_DATA))
00516 throw BagFormatException("Expected INDEX_DATA record");
00517
00518 uint32_t index_version;
00519 string topic;
00520 uint32_t count = 0;
00521 readField(fields, VER_FIELD_NAME, true, &index_version);
00522 readField(fields, TOPIC_FIELD_NAME, true, topic);
00523 readField(fields, COUNT_FIELD_NAME, true, &count);
00524
00525 ROS_DEBUG("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
00526
00527 if (index_version != 0)
00528 throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00529
00530 uint32_t connection_id;
00531 map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00532 if (topic_conn_id_iter == topic_connection_ids_.end()) {
00533 connection_id = connections_.size();
00534
00535 ROS_DEBUG("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
00536 ConnectionInfo* connection_info = new ConnectionInfo();
00537 connection_info->id = connection_id;
00538 connection_info->topic = topic;
00539 connections_[connection_id] = connection_info;
00540
00541 topic_connection_ids_[topic] = connection_id;
00542 }
00543 else
00544 connection_id = topic_conn_id_iter->second;
00545
00546 multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00547
00548 for (uint32_t i = 0; i < count; i++) {
00549 IndexEntry index_entry;
00550 uint32_t sec;
00551 uint32_t nsec;
00552 read((char*) &sec, 4);
00553 read((char*) &nsec, 4);
00554 read((char*) &index_entry.chunk_pos, 8);
00555 index_entry.time = Time(sec, nsec);
00556 index_entry.offset = 0;
00557
00558 ROS_DEBUG(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
00559
00560 if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00561 {
00562 ROS_ERROR("Index entry for topic %s contains invalid time.", topic.c_str());
00563 } else
00564 {
00565 connection_index.insert(connection_index.end(), index_entry);
00566 }
00567 }
00568 }
00569
00570 void Bag::readConnectionIndexRecord200() {
00571 ros::Header header;
00572 uint32_t data_size;
00573 if (!readHeader(header) || !readDataLength(data_size))
00574 throw BagFormatException("Error reading INDEX_DATA header");
00575 M_string& fields = *header.getValues();
00576
00577 if (!isOp(fields, OP_INDEX_DATA))
00578 throw BagFormatException("Expected INDEX_DATA record");
00579
00580 uint32_t index_version;
00581 uint32_t connection_id;
00582 uint32_t count = 0;
00583 readField(fields, VER_FIELD_NAME, true, &index_version);
00584 readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
00585 readField(fields, COUNT_FIELD_NAME, true, &count);
00586
00587 ROS_DEBUG("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
00588
00589 if (index_version != 1)
00590 throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00591
00592 uint64_t chunk_pos = curr_chunk_info_.pos;
00593
00594 multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00595
00596 for (uint32_t i = 0; i < count; i++) {
00597 IndexEntry index_entry;
00598 index_entry.chunk_pos = chunk_pos;
00599 uint32_t sec;
00600 uint32_t nsec;
00601 read((char*) &sec, 4);
00602 read((char*) &nsec, 4);
00603 read((char*) &index_entry.offset, 4);
00604 index_entry.time = Time(sec, nsec);
00605
00606 ROS_DEBUG(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
00607
00608 if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00609 {
00610 ROS_ERROR("Index entry for topic %s contains invalid time. This message will not be loaded.", connections_[connection_id]->topic.c_str());
00611 } else
00612 {
00613 connection_index.insert(connection_index.end(), index_entry);
00614 }
00615 }
00616 }
00617
00618
00619
00620 void Bag::writeConnectionRecords() {
00621 for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
00622 ConnectionInfo const* connection_info = i->second;
00623 writeConnectionRecord(connection_info);
00624 }
00625 }
00626
00627 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) {
00628 ROS_DEBUG("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
00629 (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
00630
00631 M_string header;
00632 header[OP_FIELD_NAME] = toHeaderString(&OP_CONNECTION);
00633 header[TOPIC_FIELD_NAME] = connection_info->topic;
00634 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00635 writeHeader(header);
00636
00637 writeHeader(*connection_info->header);
00638 }
00639
00640 void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) {
00641 M_string header;
00642 header[OP_FIELD_NAME] = toHeaderString(&OP_CONNECTION);
00643 header[TOPIC_FIELD_NAME] = connection_info->topic;
00644 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00645 appendHeaderToBuffer(buf, header);
00646
00647 appendHeaderToBuffer(buf, *connection_info->header);
00648 }
00649
00650 void Bag::readConnectionRecord() {
00651 ros::Header header;
00652 if (!readHeader(header))
00653 throw BagFormatException("Error reading CONNECTION header");
00654 M_string& fields = *header.getValues();
00655
00656 if (!isOp(fields, OP_CONNECTION))
00657 throw BagFormatException("Expected CONNECTION op not found");
00658
00659 uint32_t id;
00660 readField(fields, CONNECTION_FIELD_NAME, true, &id);
00661 string topic;
00662 readField(fields, TOPIC_FIELD_NAME, true, topic);
00663
00664 ros::Header connection_header;
00665 if (!readHeader(connection_header))
00666 throw BagFormatException("Error reading connection header");
00667
00668
00669 map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
00670 if (key == connections_.end()) {
00671 ConnectionInfo* connection_info = new ConnectionInfo();
00672 connection_info->id = id;
00673 connection_info->topic = topic;
00674 connection_info->header = shared_ptr<M_string>(new M_string);
00675 for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
00676 (*connection_info->header)[i->first] = i->second;
00677 connection_info->msg_def = (*connection_info->header)["message_definition"];
00678 connection_info->datatype = (*connection_info->header)["type"];
00679 connection_info->md5sum = (*connection_info->header)["md5sum"];
00680 connections_[id] = connection_info;
00681
00682 ROS_DEBUG("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
00683 }
00684 }
00685
00686 void Bag::readMessageDefinitionRecord102() {
00687 ros::Header header;
00688 uint32_t data_size;
00689 if (!readHeader(header) || !readDataLength(data_size))
00690 throw BagFormatException("Error reading message definition header");
00691 M_string& fields = *header.getValues();
00692
00693 if (!isOp(fields, OP_MSG_DEF))
00694 throw BagFormatException("Expected MSG_DEF op not found");
00695
00696 string topic, md5sum, datatype, message_definition;
00697 readField(fields, TOPIC_FIELD_NAME, true, topic);
00698 readField(fields, MD5_FIELD_NAME, 32, 32, true, md5sum);
00699 readField(fields, TYPE_FIELD_NAME, true, datatype);
00700 readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition);
00701
00702 ConnectionInfo* connection_info;
00703
00704 map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00705 if (topic_conn_id_iter == topic_connection_ids_.end()) {
00706 uint32_t id = connections_.size();
00707
00708 ROS_DEBUG("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00709 connection_info = new ConnectionInfo();
00710 connection_info->id = id;
00711 connection_info->topic = topic;
00712
00713 connections_[id] = connection_info;
00714 topic_connection_ids_[topic] = id;
00715 }
00716 else
00717 connection_info = connections_[topic_conn_id_iter->second];
00718
00719 connection_info->msg_def = message_definition;
00720 connection_info->datatype = datatype;
00721 connection_info->md5sum = md5sum;
00722 connection_info->header = boost::shared_ptr<ros::M_string>(new ros::M_string);
00723 (*connection_info->header)["type"] = connection_info->datatype;
00724 (*connection_info->header)["md5sum"] = connection_info->md5sum;
00725 (*connection_info->header)["message_definition"] = connection_info->msg_def;
00726
00727 ROS_DEBUG("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00728 }
00729
00730 void Bag::decompressChunk(uint64_t chunk_pos) const {
00731 if (curr_chunk_info_.pos == chunk_pos) {
00732 current_buffer_ = &outgoing_chunk_buffer_;
00733 return;
00734 }
00735
00736 current_buffer_ = &decompress_buffer_;
00737
00738 if (decompressed_chunk_ == chunk_pos)
00739 return;
00740
00741
00742 seek(chunk_pos);
00743
00744
00745 ChunkHeader chunk_header;
00746 readChunkHeader(chunk_header);
00747
00748
00749 if (chunk_header.compression == COMPRESSION_NONE)
00750 decompressRawChunk(chunk_header);
00751 else if (chunk_header.compression == COMPRESSION_BZ2)
00752 decompressBz2Chunk(chunk_header);
00753 else
00754 throw BagFormatException("Unknown compression: " + chunk_header.compression);
00755
00756 decompressed_chunk_ = chunk_pos;
00757 }
00758
00759 void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const {
00760 ROS_DEBUG("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
00761
00762 seek(offset);
00763
00764 uint32_t data_size;
00765 uint8_t op;
00766 do {
00767 if (!readHeader(header) || !readDataLength(data_size))
00768 throw BagFormatException("Error reading header");
00769
00770 readField(*header.getValues(), OP_FIELD_NAME, true, &op);
00771 }
00772 while (op == OP_MSG_DEF);
00773
00774 if (op != OP_MSG_DATA)
00775 throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str());
00776
00777 record_buffer_.setSize(data_size);
00778 file_.read((char*) record_buffer_.getData(), data_size);
00779 }
00780
00781
00782 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
00783 assert(chunk_header.compression == COMPRESSION_NONE);
00784 assert(chunk_header.compressed_size == chunk_header.uncompressed_size);
00785
00786 ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00787
00788 decompress_buffer_.setSize(chunk_header.compressed_size);
00789 file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size);
00790
00791
00792 }
00793
00794 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
00795 assert(chunk_header.compression == COMPRESSION_BZ2);
00796
00797 CompressionType compression = compression::BZ2;
00798
00799 ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00800
00801 chunk_buffer_.setSize(chunk_header.compressed_size);
00802 file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
00803
00804 decompress_buffer_.setSize(chunk_header.uncompressed_size);
00805 file_.decompress(compression, decompress_buffer_.getData(), decompress_buffer_.getSize(), chunk_buffer_.getData(), chunk_buffer_.getSize());
00806
00807
00808 }
00809
00810 ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) {
00811 ros::Header header;
00812 uint32_t data_size;
00813 uint32_t bytes_read;
00814 switch (version_)
00815 {
00816 case 200:
00817 decompressChunk(index_entry.chunk_pos);
00818 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00819 return header;
00820 case 102:
00821 readMessageDataRecord102(index_entry.chunk_pos, header);
00822 return header;
00823 default:
00824 throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00825 }
00826 }
00827
00828
00829 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
00830 ros::Header header;
00831 uint32_t data_size;
00832 uint32_t bytes_read;
00833 switch (version_)
00834 {
00835 case 200:
00836 decompressChunk(index_entry.chunk_pos);
00837 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00838 return data_size;
00839 case 102:
00840 readMessageDataRecord102(index_entry.chunk_pos, header);
00841 return record_buffer_.getSize();
00842 default:
00843 throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00844 }
00845 }
00846
00847 void Bag::writeChunkInfoRecords() {
00848 foreach(ChunkInfo const& chunk_info, chunks_) {
00849
00850 M_string header;
00851 uint32_t chunk_connection_count = chunk_info.connection_counts.size();
00852 header[OP_FIELD_NAME] = toHeaderString(&OP_CHUNK_INFO);
00853 header[VER_FIELD_NAME] = toHeaderString(&CHUNK_INFO_VERSION);
00854 header[CHUNK_POS_FIELD_NAME] = toHeaderString(&chunk_info.pos);
00855 header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
00856 header[END_TIME_FIELD_NAME] = toHeaderString(&chunk_info.end_time);
00857 header[COUNT_FIELD_NAME] = toHeaderString(&chunk_connection_count);
00858
00859 ROS_DEBUG("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
00860 (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
00861 chunk_info.start_time.sec, chunk_info.start_time.nsec,
00862 chunk_info.end_time.sec, chunk_info.end_time.nsec);
00863
00864 writeHeader(header);
00865
00866 writeDataLength(8 * chunk_connection_count);
00867
00868
00869 for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
00870 uint32_t connection_id = i->first;
00871 uint32_t count = i->second;
00872
00873 write((char*) &connection_id, 4);
00874 write((char*) &count, 4);
00875
00876 ROS_DEBUG(" - %d: %d", connection_id, count);
00877 }
00878 }
00879 }
00880
00881 void Bag::readChunkInfoRecord() {
00882
00883 ros::Header header;
00884 uint32_t data_size;
00885 if (!readHeader(header) || !readDataLength(data_size))
00886 throw BagFormatException("Error reading CHUNK_INFO record header");
00887 M_string& fields = *header.getValues();
00888 if (!isOp(fields, OP_CHUNK_INFO))
00889 throw BagFormatException("Expected CHUNK_INFO op not found");
00890
00891
00892 uint32_t chunk_info_version;
00893 readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
00894 if (chunk_info_version != CHUNK_INFO_VERSION)
00895 throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str());
00896
00897
00898 ChunkInfo chunk_info;
00899 readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos);
00900 readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time);
00901 readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time);
00902 uint32_t chunk_connection_count = 0;
00903 readField(fields, COUNT_FIELD_NAME, true, &chunk_connection_count);
00904
00905 ROS_DEBUG("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
00906 (unsigned long long) chunk_info.pos, chunk_connection_count,
00907 chunk_info.start_time.sec, chunk_info.start_time.nsec,
00908 chunk_info.end_time.sec, chunk_info.end_time.nsec);
00909
00910
00911 for (uint32_t i = 0; i < chunk_connection_count; i ++) {
00912 uint32_t connection_id, connection_count;
00913 read((char*) &connection_id, 4);
00914 read((char*) &connection_count, 4);
00915
00916 ROS_DEBUG(" %d: %d messages", connection_id, connection_count);
00917
00918 chunk_info.connection_counts[connection_id] = connection_count;
00919 }
00920
00921 chunks_.push_back(chunk_info);
00922 }
00923
00924
00925
00926 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
00927 uint8_t op = 0xFF;
00928 readField(fields, OP_FIELD_NAME, true, &op);
00929 return op == reqOp;
00930 }
00931
00932 void Bag::writeHeader(M_string const& fields) {
00933 boost::shared_array<uint8_t> header_buffer;
00934 uint32_t header_len;
00935 ros::Header::write(fields, header_buffer, header_len);
00936 write((char*) &header_len, 4);
00937 write((char*) header_buffer.get(), header_len);
00938 }
00939
00940 void Bag::writeDataLength(uint32_t data_len) {
00941 write((char*) &data_len, 4);
00942 }
00943
00944 void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) {
00945 boost::shared_array<uint8_t> header_buffer;
00946 uint32_t header_len;
00947 ros::Header::write(fields, header_buffer, header_len);
00948
00949 uint32_t offset = buf.getSize();
00950
00951 buf.setSize(buf.getSize() + 4 + header_len);
00952
00953 memcpy(buf.getData() + offset, &header_len, 4);
00954 offset += 4;
00955 memcpy(buf.getData() + offset, header_buffer.get(), header_len);
00956 }
00957
00958 void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) {
00959 uint32_t offset = buf.getSize();
00960
00961 buf.setSize(buf.getSize() + 4);
00962
00963 memcpy(buf.getData() + offset, &data_len, 4);
00964 }
00965
00967 void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const {
00968 ROS_ASSERT(buffer.getSize() > 8);
00969
00970 uint8_t* start = (uint8_t*) buffer.getData() + offset;
00971
00972 uint8_t* ptr = start;
00973
00974
00975 uint32_t header_len;
00976 memcpy(&header_len, ptr, 4);
00977 ptr += 4;
00978
00979
00980 string error_msg;
00981 bool parsed = header.parse(ptr, header_len, error_msg);
00982 if (!parsed)
00983 throw BagFormatException("Error parsing header");
00984 ptr += header_len;
00985
00986
00987 memcpy(&data_size, ptr, 4);
00988 ptr += 4;
00989
00990 bytes_read = ptr - start;
00991 }
00992
00993 void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) const {
00994 total_bytes_read = 0;
00995 uint8_t op = 0xFF;
00996 do {
00997 ROS_DEBUG("reading header from buffer: offset=%d", offset);
00998 uint32_t bytes_read;
00999 readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
01000
01001 offset += bytes_read;
01002 total_bytes_read += bytes_read;
01003
01004 readField(*header.getValues(), OP_FIELD_NAME, true, &op);
01005 }
01006 while (op == OP_MSG_DEF || op == OP_CONNECTION);
01007
01008 if (op != OP_MSG_DATA)
01009 throw BagFormatException("Expected MSG_DATA op not found");
01010 }
01011
01012 bool Bag::readHeader(ros::Header& header) const {
01013
01014 uint32_t header_len;
01015 read((char*) &header_len, 4);
01016
01017
01018 header_buffer_.setSize(header_len);
01019 read((char*) header_buffer_.getData(), header_len);
01020
01021
01022 string error_msg;
01023 bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
01024 if (!parsed)
01025 return false;
01026
01027 return true;
01028 }
01029
01030 bool Bag::readDataLength(uint32_t& data_size) const {
01031 read((char*) &data_size, 4);
01032 return true;
01033 }
01034
01035 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
01036 M_string::const_iterator fitr = fields.find(field);
01037 if (fitr == fields.end()) {
01038 if (required)
01039 throw BagFormatException("Required '" + field + "' field missing");
01040 }
01041 else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
01042 throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
01043
01044 return fitr;
01045 }
01046
01047 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
01048 return readField(fields, field_name, 1, UINT_MAX, required, data);
01049 }
01050
01051 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
01052 M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
01053 if (fitr == fields.end())
01054 return false;
01055
01056 data = fitr->second;
01057 return true;
01058 }
01059
01060 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
01061 uint64_t packed_time;
01062 if (!readField(fields, field_name, required, &packed_time))
01063 return false;
01064
01065 uint64_t bitmask = (1LL << 33) - 1;
01066 data.sec = (uint32_t) (packed_time & bitmask);
01067 data.nsec = (uint32_t) (packed_time >> 32);
01068
01069 return true;
01070 }
01071
01072 std::string Bag::toHeaderString(Time const* field) const {
01073 uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
01074 return toHeaderString(&packed_time);
01075 }
01076
01077 ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size) {
01078 ros::AdvertiseOptions opts(c->topic, queue_size, c->md5sum, c->datatype, c->msg_def);
01079 ros::M_string::const_iterator header_iter = c->header->find("latching");
01080 opts.latch = (header_iter != c->header->end() && header_iter->second == "1");
01081 return opts;
01082 }
01083
01084
01085
01086 void Bag::write(string const& s) { write(s.c_str(), s.length()); }
01087 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n); }
01088
01089 void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); }
01090 void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); }
01091
01092 }