00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "rosbag/bag.h"
00029 #include "rosbag/message_instance.h"
00030 #include "rosbag/query.h"
00031 #include "rosbag/view.h"
00032
00033 #include <inttypes.h>
00034 #include <signal.h>
00035
00036 #include <iomanip>
00037
00038 #include <boost/foreach.hpp>
00039
00040 #define foreach BOOST_FOREACH
00041
00042 using std::map;
00043 using std::priority_queue;
00044 using std::string;
00045 using std::vector;
00046 using std::multiset;
00047 using boost::format;
00048 using boost::shared_ptr;
00049 using ros::M_string;
00050 using ros::Time;
00051
00052 namespace rosbag {
00053
00054 Bag::Bag() :
00055 compression_(compression::Uncompressed),
00056 chunk_threshold_(768 * 1024),
00057 bag_revision_(0),
00058 file_size_(0),
00059 file_header_pos_(0),
00060 index_data_pos_(0),
00061 connection_count_(0),
00062 chunk_count_(0),
00063 chunk_open_(false),
00064 curr_chunk_data_pos_(0),
00065 decompressed_chunk_(0)
00066 {
00067 }
00068
00069 Bag::Bag(string const& filename, uint32_t mode) :
00070 compression_(compression::Uncompressed),
00071 chunk_threshold_(768 * 1024),
00072 bag_revision_(0),
00073 file_size_(0),
00074 file_header_pos_(0),
00075 index_data_pos_(0),
00076 connection_count_(0),
00077 chunk_count_(0),
00078 chunk_open_(false),
00079 curr_chunk_data_pos_(0),
00080 decompressed_chunk_(0)
00081 {
00082 open(filename, mode);
00083 }
00084
00085 Bag::~Bag() {
00086 close();
00087 }
00088
00089 void Bag::open(string const& filename, uint32_t mode) {
00090 mode_ = (BagMode) mode;
00091
00092 if (mode_ & bagmode::Append)
00093 openAppend(filename);
00094 else if (mode_ & bagmode::Write)
00095 openWrite(filename);
00096 else if (mode_ & bagmode::Read)
00097 openRead(filename);
00098 else
00099 throw BagException((format("Unknown mode: %1%") % (int) mode).str());
00100
00101
00102 uint64_t offset = file_.getOffset();
00103 seek(0, std::ios::end);
00104 file_size_ = file_.getOffset();
00105 seek(offset);
00106 }
00107
00108 void Bag::openRead(string const& filename) {
00109 file_.openRead(filename);
00110
00111 readVersion();
00112
00113 switch (version_) {
00114 case 102: startReadingVersion102(); break;
00115 case 200: startReadingVersion200(); break;
00116 default:
00117 throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str());
00118 }
00119 }
00120
00121 void Bag::openWrite(string const& filename) {
00122 file_.openWrite(filename);
00123
00124 startWriting();
00125 }
00126
00127 void Bag::openAppend(string const& filename) {
00128 file_.openReadWrite(filename);
00129
00130 readVersion();
00131
00132 if (version_ != 200)
00133 throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str());
00134
00135 startReadingVersion200();
00136
00137
00138 file_.truncate(index_data_pos_);
00139 index_data_pos_ = 0;
00140
00141
00142 seek(file_header_pos_);
00143 writeFileHeaderRecord();
00144
00145
00146 seek(0, std::ios::end);
00147 }
00148
00149 void Bag::close() {
00150 if (!file_.isOpen())
00151 return;
00152
00153 if (mode_ & bagmode::Write || mode_ & bagmode::Append)
00154 closeWrite();
00155
00156 file_.close();
00157
00158 topic_connection_ids_.clear();
00159 header_connection_ids_.clear();
00160 for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
00161 delete i->second;
00162 connections_.clear();
00163 chunks_.clear();
00164 connection_indexes_.clear();
00165 curr_chunk_connection_indexes_.clear();
00166 }
00167
00168 void Bag::closeWrite() {
00169 stopWriting();
00170 }
00171
00172 string Bag::getFileName() const { return file_.getFileName(); }
00173 BagMode Bag::getMode() const { return mode_; }
00174 uint64_t Bag::getSize() const { return file_size_; }
00175
00176 uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; }
00177
00178 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
00179 if (file_.isOpen() && chunk_open_)
00180 stopWritingChunk();
00181
00182 chunk_threshold_ = chunk_threshold;
00183 }
00184
00185 CompressionType Bag::getCompression() const { return compression_; }
00186
00187 void Bag::setCompression(CompressionType compression) {
00188 if (file_.isOpen() && chunk_open_)
00189 stopWritingChunk();
00190
00191 compression_ = compression;
00192 }
00193
00194
00195
00196 void Bag::writeVersion() {
00197 string version = string("#ROSBAG V") + VERSION + string("\n");
00198
00199 ROS_DEBUG("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
00200
00201 version_ = 200;
00202
00203 write(version);
00204 }
00205
00206 void Bag::readVersion() {
00207 string version_line = file_.getline();
00208
00209 file_header_pos_ = file_.getOffset();
00210
00211 char logtypename[100];
00212 int version_major, version_minor;
00213 if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3)
00214 throw BagIOException("Error reading version line");
00215
00216 version_ = version_major * 100 + version_minor;
00217
00218 ROS_DEBUG("Read VERSION: version=%d", version_);
00219 }
00220
00221 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
00222 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
00223
00224
00225
00226 void Bag::startWriting() {
00227 writeVersion();
00228 file_header_pos_ = file_.getOffset();
00229 writeFileHeaderRecord();
00230 }
00231
00232 void Bag::stopWriting() {
00233 if (chunk_open_)
00234 stopWritingChunk();
00235
00236 seek(0, std::ios::end);
00237
00238 index_data_pos_ = file_.getOffset();
00239 writeConnectionRecords();
00240 writeChunkInfoRecords();
00241
00242 seek(file_header_pos_);
00243 writeFileHeaderRecord();
00244 }
00245
00246 void Bag::startReadingVersion200() {
00247
00248 readFileHeaderRecord();
00249
00250
00251 seek(index_data_pos_);
00252
00253
00254 for (uint32_t i = 0; i < connection_count_; i++)
00255 readConnectionRecord();
00256
00257
00258 for (uint32_t i = 0; i < chunk_count_; i++)
00259 readChunkInfoRecord();
00260
00261
00262 foreach(ChunkInfo const& chunk_info, chunks_) {
00263 curr_chunk_info_ = chunk_info;
00264
00265 seek(curr_chunk_info_.pos);
00266
00267
00268 ChunkHeader chunk_header;
00269 readChunkHeader(chunk_header);
00270 seek(chunk_header.compressed_size, std::ios::cur);
00271
00272
00273 for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
00274 readConnectionIndexRecord200();
00275 }
00276
00277
00278 curr_chunk_info_ = ChunkInfo();
00279 }
00280
00281 void Bag::startReadingVersion102() {
00282 try
00283 {
00284
00285 readFileHeaderRecord();
00286 }
00287 catch (BagFormatException ex) {
00288 throw BagUnindexedException();
00289 }
00290
00291
00292 seek(0, std::ios::end);
00293 uint64_t filelength = file_.getOffset();
00294
00295
00296 seek(index_data_pos_);
00297
00298
00299 while (file_.getOffset() < filelength)
00300 readTopicIndexRecord102();
00301
00302
00303 for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
00304 multiset<IndexEntry> const& index = i->second;
00305 IndexEntry const& first_entry = *index.begin();
00306
00307 ROS_DEBUG("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
00308
00309 seek(first_entry.chunk_pos);
00310
00311 readMessageDefinitionRecord102();
00312 }
00313 }
00314
00315
00316
00317 void Bag::writeFileHeaderRecord() {
00318 connection_count_ = connections_.size();
00319 chunk_count_ = chunks_.size();
00320
00321 ROS_DEBUG("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
00322 (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00323
00324
00325 M_string header;
00326 header[OP_FIELD_NAME] = toHeaderString(&OP_FILE_HEADER);
00327 header[INDEX_POS_FIELD_NAME] = toHeaderString(&index_data_pos_);
00328 header[CONNECTION_COUNT_FIELD_NAME] = toHeaderString(&connection_count_);
00329 header[CHUNK_COUNT_FIELD_NAME] = toHeaderString(&chunk_count_);
00330
00331 boost::shared_array<uint8_t> header_buffer;
00332 uint32_t header_len;
00333 ros::Header::write(header, header_buffer, header_len);
00334 uint32_t data_len = 0;
00335 if (header_len < FILE_HEADER_LENGTH)
00336 data_len = FILE_HEADER_LENGTH - header_len;
00337 write((char*) &header_len, 4);
00338 write((char*) header_buffer.get(), header_len);
00339 write((char*) &data_len, 4);
00340
00341
00342 if (data_len > 0) {
00343 string padding;
00344 padding.resize(data_len, ' ');
00345 write(padding);
00346 }
00347 }
00348
00349 void Bag::readFileHeaderRecord() {
00350 ros::Header header;
00351 uint32_t data_size;
00352 if (!readHeader(header) || !readDataLength(data_size))
00353 throw BagFormatException("Error reading FILE_HEADER record");
00354
00355 M_string& fields = *header.getValues();
00356
00357 if (!isOp(fields, OP_FILE_HEADER))
00358 throw BagFormatException("Expected FILE_HEADER op not found");
00359
00360
00361 readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
00362
00363 if (index_data_pos_ == 0)
00364 throw BagUnindexedException();
00365
00366
00367 if (version_ >= 200) {
00368 readField(fields, CONNECTION_COUNT_FIELD_NAME, true, &connection_count_);
00369 readField(fields, CHUNK_COUNT_FIELD_NAME, true, &chunk_count_);
00370 }
00371
00372 ROS_DEBUG("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
00373 (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00374
00375
00376 seek(data_size, std::ios::cur);
00377 }
00378
00379 uint32_t Bag::getChunkOffset() const {
00380 if (compression_ == compression::Uncompressed)
00381 return file_.getOffset() - curr_chunk_data_pos_;
00382 else
00383 return file_.getCompressedBytesIn();
00384 }
00385
00386 void Bag::startWritingChunk(Time time) {
00387
00388 curr_chunk_info_.pos = file_.getOffset();
00389 curr_chunk_info_.start_time = time;
00390 curr_chunk_info_.end_time = time;
00391
00392
00393 writeChunkHeader(compression_, 0, 0);
00394
00395
00396 file_.setWriteMode(compression_);
00397
00398
00399 curr_chunk_data_pos_ = file_.getOffset();
00400
00401 chunk_open_ = true;
00402 }
00403
00404 void Bag::stopWritingChunk() {
00405
00406 chunks_.push_back(curr_chunk_info_);
00407
00408
00409 uint32_t uncompressed_size = getChunkOffset();
00410 file_.setWriteMode(compression::Uncompressed);
00411 uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_;
00412
00413
00414 uint64_t end_of_chunk_pos = file_.getOffset();
00415
00416 seek(curr_chunk_info_.pos);
00417 writeChunkHeader(compression_, compressed_size, uncompressed_size);
00418
00419
00420 seek(end_of_chunk_pos);
00421 writeIndexRecords();
00422 curr_chunk_connection_indexes_.clear();
00423
00424
00425 curr_chunk_info_.connection_counts.clear();
00426
00427
00428 chunk_open_ = false;
00429 }
00430
00431 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
00432 ChunkHeader chunk_header;
00433 switch (compression) {
00434 case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
00435 case compression::BZ2: chunk_header.compression = COMPRESSION_BZ2; break;
00436
00437 }
00438 chunk_header.compressed_size = compressed_size;
00439 chunk_header.uncompressed_size = uncompressed_size;
00440
00441 ROS_DEBUG("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
00442 (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
00443
00444 M_string header;
00445 header[OP_FIELD_NAME] = toHeaderString(&OP_CHUNK);
00446 header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
00447 header[SIZE_FIELD_NAME] = toHeaderString(&chunk_header.uncompressed_size);
00448 writeHeader(header);
00449
00450 writeDataLength(chunk_header.compressed_size);
00451 }
00452
00453 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
00454 ros::Header header;
00455 if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
00456 throw BagFormatException("Error reading CHUNK record");
00457
00458 M_string& fields = *header.getValues();
00459
00460 if (!isOp(fields, OP_CHUNK))
00461 throw BagFormatException("Expected CHUNK op not found");
00462
00463 readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
00464 readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size);
00465
00466 ROS_DEBUG("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
00467 }
00468
00469
00470
00471 void Bag::writeIndexRecords() {
00472 for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
00473 uint32_t connection_id = i->first;
00474 multiset<IndexEntry> const& index = i->second;
00475
00476
00477 uint32_t index_size = index.size();
00478 M_string header;
00479 header[OP_FIELD_NAME] = toHeaderString(&OP_INDEX_DATA);
00480 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
00481 header[VER_FIELD_NAME] = toHeaderString(&INDEX_VERSION);
00482 header[COUNT_FIELD_NAME] = toHeaderString(&index_size);
00483 writeHeader(header);
00484
00485 writeDataLength(index_size * 12);
00486
00487 ROS_DEBUG("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
00488
00489
00490 foreach(IndexEntry const& e, index) {
00491 write((char*) &e.time.sec, 4);
00492 write((char*) &e.time.nsec, 4);
00493 write((char*) &e.offset, 4);
00494
00495 ROS_DEBUG(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
00496 }
00497 }
00498 }
00499
00500 void Bag::readTopicIndexRecord102() {
00501 ros::Header header;
00502 uint32_t data_size;
00503 if (!readHeader(header) || !readDataLength(data_size))
00504 throw BagFormatException("Error reading INDEX_DATA header");
00505 M_string& fields = *header.getValues();
00506
00507 if (!isOp(fields, OP_INDEX_DATA))
00508 throw BagFormatException("Expected INDEX_DATA record");
00509
00510 uint32_t index_version;
00511 string topic;
00512 uint32_t count = 0;
00513 readField(fields, VER_FIELD_NAME, true, &index_version);
00514 readField(fields, TOPIC_FIELD_NAME, true, topic);
00515 readField(fields, COUNT_FIELD_NAME, true, &count);
00516
00517 ROS_DEBUG("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
00518
00519 if (index_version != 0)
00520 throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00521
00522 uint32_t connection_id;
00523 map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00524 if (topic_conn_id_iter == topic_connection_ids_.end()) {
00525 connection_id = connections_.size();
00526
00527 ROS_DEBUG("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
00528 ConnectionInfo* connection_info = new ConnectionInfo();
00529 connection_info->id = connection_id;
00530 connection_info->topic = topic;
00531 connections_[connection_id] = connection_info;
00532
00533 topic_connection_ids_[topic] = connection_id;
00534 }
00535 else
00536 connection_id = topic_conn_id_iter->second;
00537
00538 multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00539
00540 for (uint32_t i = 0; i < count; i++) {
00541 IndexEntry index_entry;
00542 uint32_t sec;
00543 uint32_t nsec;
00544 read((char*) &sec, 4);
00545 read((char*) &nsec, 4);
00546 read((char*) &index_entry.chunk_pos, 8);
00547 index_entry.time = Time(sec, nsec);
00548 index_entry.offset = 0;
00549
00550 ROS_DEBUG(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
00551
00552 if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00553 {
00554 ROS_ERROR("Index entry for topic %s contains invalid time.", topic.c_str());
00555 } else
00556 {
00557 connection_index.insert(connection_index.end(), index_entry);
00558 }
00559 }
00560 }
00561
00562 void Bag::readConnectionIndexRecord200() {
00563 ros::Header header;
00564 uint32_t data_size;
00565 if (!readHeader(header) || !readDataLength(data_size))
00566 throw BagFormatException("Error reading INDEX_DATA header");
00567 M_string& fields = *header.getValues();
00568
00569 if (!isOp(fields, OP_INDEX_DATA))
00570 throw BagFormatException("Expected INDEX_DATA record");
00571
00572 uint32_t index_version;
00573 uint32_t connection_id;
00574 uint32_t count = 0;
00575 readField(fields, VER_FIELD_NAME, true, &index_version);
00576 readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
00577 readField(fields, COUNT_FIELD_NAME, true, &count);
00578
00579 ROS_DEBUG("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
00580
00581 if (index_version != 1)
00582 throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00583
00584 uint64_t chunk_pos = curr_chunk_info_.pos;
00585
00586 multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00587
00588 for (uint32_t i = 0; i < count; i++) {
00589 IndexEntry index_entry;
00590 index_entry.chunk_pos = chunk_pos;
00591 uint32_t sec;
00592 uint32_t nsec;
00593 read((char*) &sec, 4);
00594 read((char*) &nsec, 4);
00595 read((char*) &index_entry.offset, 4);
00596 index_entry.time = Time(sec, nsec);
00597
00598 ROS_DEBUG(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
00599
00600 if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00601 {
00602 ROS_ERROR("Index entry for topic %s contains invalid time. This message will not be loaded.", connections_[connection_id]->topic.c_str());
00603 } else
00604 {
00605 connection_index.insert(connection_index.end(), index_entry);
00606 }
00607 }
00608 }
00609
00610
00611
00612 void Bag::writeConnectionRecords() {
00613 for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
00614 ConnectionInfo const* connection_info = i->second;
00615 writeConnectionRecord(connection_info);
00616 }
00617 }
00618
00619 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) {
00620 ROS_DEBUG("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
00621 (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
00622
00623 M_string header;
00624 header[OP_FIELD_NAME] = toHeaderString(&OP_CONNECTION);
00625 header[TOPIC_FIELD_NAME] = connection_info->topic;
00626 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00627 writeHeader(header);
00628
00629 writeHeader(*connection_info->header);
00630 }
00631
00632 void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) {
00633 M_string header;
00634 header[OP_FIELD_NAME] = toHeaderString(&OP_CONNECTION);
00635 header[TOPIC_FIELD_NAME] = connection_info->topic;
00636 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00637 appendHeaderToBuffer(buf, header);
00638
00639 appendHeaderToBuffer(buf, *connection_info->header);
00640 }
00641
00642 void Bag::readConnectionRecord() {
00643 ros::Header header;
00644 if (!readHeader(header))
00645 throw BagFormatException("Error reading CONNECTION header");
00646 M_string& fields = *header.getValues();
00647
00648 if (!isOp(fields, OP_CONNECTION))
00649 throw BagFormatException("Expected CONNECTION op not found");
00650
00651 uint32_t id;
00652 readField(fields, CONNECTION_FIELD_NAME, true, &id);
00653 string topic;
00654 readField(fields, TOPIC_FIELD_NAME, true, topic);
00655
00656 ros::Header connection_header;
00657 if (!readHeader(connection_header))
00658 throw BagFormatException("Error reading connection header");
00659
00660
00661 map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
00662 if (key == connections_.end()) {
00663 ConnectionInfo* connection_info = new ConnectionInfo();
00664 connection_info->id = id;
00665 connection_info->topic = topic;
00666 connection_info->header = shared_ptr<M_string>(new M_string);
00667 for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
00668 (*connection_info->header)[i->first] = i->second;
00669 connection_info->msg_def = (*connection_info->header)["message_definition"];
00670 connection_info->datatype = (*connection_info->header)["type"];
00671 connection_info->md5sum = (*connection_info->header)["md5sum"];
00672 connections_[id] = connection_info;
00673
00674 ROS_DEBUG("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
00675 }
00676 }
00677
00678 void Bag::readMessageDefinitionRecord102() {
00679 ros::Header header;
00680 uint32_t data_size;
00681 if (!readHeader(header) || !readDataLength(data_size))
00682 throw BagFormatException("Error reading message definition header");
00683 M_string& fields = *header.getValues();
00684
00685 if (!isOp(fields, OP_MSG_DEF))
00686 throw BagFormatException("Expected MSG_DEF op not found");
00687
00688 string topic, md5sum, datatype, message_definition;
00689 readField(fields, TOPIC_FIELD_NAME, true, topic);
00690 readField(fields, MD5_FIELD_NAME, 32, 32, true, md5sum);
00691 readField(fields, TYPE_FIELD_NAME, true, datatype);
00692 readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition);
00693
00694 ConnectionInfo* connection_info;
00695
00696 map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00697 if (topic_conn_id_iter == topic_connection_ids_.end()) {
00698 uint32_t id = connections_.size();
00699
00700 ROS_DEBUG("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00701 connection_info = new ConnectionInfo();
00702 connection_info->id = id;
00703 connection_info->topic = topic;
00704
00705 connections_[id] = connection_info;
00706 topic_connection_ids_[topic] = id;
00707 }
00708 else
00709 connection_info = connections_[topic_conn_id_iter->second];
00710
00711 connection_info->msg_def = message_definition;
00712 connection_info->datatype = datatype;
00713 connection_info->md5sum = md5sum;
00714 connection_info->header = boost::shared_ptr<ros::M_string>(new ros::M_string);
00715 (*connection_info->header)["type"] = connection_info->datatype;
00716 (*connection_info->header)["md5sum"] = connection_info->md5sum;
00717 (*connection_info->header)["message_definition"] = connection_info->msg_def;
00718
00719 ROS_DEBUG("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00720 }
00721
00722 void Bag::decompressChunk(uint64_t chunk_pos) const {
00723 if (curr_chunk_info_.pos == chunk_pos) {
00724 current_buffer_ = &outgoing_chunk_buffer_;
00725 return;
00726 }
00727
00728 current_buffer_ = &decompress_buffer_;
00729
00730 if (decompressed_chunk_ == chunk_pos)
00731 return;
00732
00733
00734 seek(chunk_pos);
00735
00736
00737 ChunkHeader chunk_header;
00738 readChunkHeader(chunk_header);
00739
00740
00741 if (chunk_header.compression == COMPRESSION_NONE)
00742 decompressRawChunk(chunk_header);
00743 else if (chunk_header.compression == COMPRESSION_BZ2)
00744 decompressBz2Chunk(chunk_header);
00745 else
00746 throw BagFormatException("Unknown compression: " + chunk_header.compression);
00747
00748 decompressed_chunk_ = chunk_pos;
00749 }
00750
00751 void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const {
00752 ROS_DEBUG("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
00753
00754 seek(offset);
00755
00756 uint32_t data_size;
00757 uint8_t op;
00758 do {
00759 if (!readHeader(header) || !readDataLength(data_size))
00760 throw BagFormatException("Error reading header");
00761
00762 readField(*header.getValues(), OP_FIELD_NAME, true, &op);
00763 }
00764 while (op == OP_MSG_DEF);
00765
00766 if (op != OP_MSG_DATA)
00767 throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str());
00768
00769 record_buffer_.setSize(data_size);
00770 file_.read((char*) record_buffer_.getData(), data_size);
00771 }
00772
00773
00774 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
00775 assert(chunk_header.compression == COMPRESSION_NONE);
00776 assert(chunk_header.compressed_size == chunk_header.uncompressed_size);
00777
00778 ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00779
00780 decompress_buffer_.setSize(chunk_header.compressed_size);
00781 file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size);
00782
00783
00784 }
00785
00786 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
00787 assert(chunk_header.compression == COMPRESSION_BZ2);
00788
00789 CompressionType compression = compression::BZ2;
00790
00791 ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00792
00793 chunk_buffer_.setSize(chunk_header.compressed_size);
00794 file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
00795
00796 decompress_buffer_.setSize(chunk_header.uncompressed_size);
00797 file_.decompress(compression, decompress_buffer_.getData(), decompress_buffer_.getSize(), chunk_buffer_.getData(), chunk_buffer_.getSize());
00798
00799
00800 }
00801
00802 ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) {
00803 ros::Header header;
00804 uint32_t data_size;
00805 uint32_t bytes_read;
00806 switch (version_)
00807 {
00808 case 200:
00809 decompressChunk(index_entry.chunk_pos);
00810 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00811 return header;
00812 case 102:
00813 readMessageDataRecord102(index_entry.chunk_pos, header);
00814 return header;
00815 default:
00816 throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00817 }
00818 }
00819
00820
00821 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
00822 ros::Header header;
00823 uint32_t data_size;
00824 uint32_t bytes_read;
00825 switch (version_)
00826 {
00827 case 200:
00828 decompressChunk(index_entry.chunk_pos);
00829 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00830 return data_size;
00831 case 102:
00832 readMessageDataRecord102(index_entry.chunk_pos, header);
00833 return record_buffer_.getSize();
00834 default:
00835 throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00836 }
00837 }
00838
00839 void Bag::writeChunkInfoRecords() {
00840 foreach(ChunkInfo const& chunk_info, chunks_) {
00841
00842 M_string header;
00843 uint32_t chunk_connection_count = chunk_info.connection_counts.size();
00844 header[OP_FIELD_NAME] = toHeaderString(&OP_CHUNK_INFO);
00845 header[VER_FIELD_NAME] = toHeaderString(&CHUNK_INFO_VERSION);
00846 header[CHUNK_POS_FIELD_NAME] = toHeaderString(&chunk_info.pos);
00847 header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
00848 header[END_TIME_FIELD_NAME] = toHeaderString(&chunk_info.end_time);
00849 header[COUNT_FIELD_NAME] = toHeaderString(&chunk_connection_count);
00850
00851 ROS_DEBUG("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
00852 (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
00853 chunk_info.start_time.sec, chunk_info.start_time.nsec,
00854 chunk_info.end_time.sec, chunk_info.end_time.nsec);
00855
00856 writeHeader(header);
00857
00858 writeDataLength(8 * chunk_connection_count);
00859
00860
00861 for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
00862 uint32_t connection_id = i->first;
00863 uint32_t count = i->second;
00864
00865 write((char*) &connection_id, 4);
00866 write((char*) &count, 4);
00867
00868 ROS_DEBUG(" - %d: %d", connection_id, count);
00869 }
00870 }
00871 }
00872
00873 void Bag::readChunkInfoRecord() {
00874
00875 ros::Header header;
00876 uint32_t data_size;
00877 if (!readHeader(header) || !readDataLength(data_size))
00878 throw BagFormatException("Error reading CHUNK_INFO record header");
00879 M_string& fields = *header.getValues();
00880 if (!isOp(fields, OP_CHUNK_INFO))
00881 throw BagFormatException("Expected CHUNK_INFO op not found");
00882
00883
00884 uint32_t chunk_info_version;
00885 readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
00886 if (chunk_info_version != CHUNK_INFO_VERSION)
00887 throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str());
00888
00889
00890 ChunkInfo chunk_info;
00891 readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos);
00892 readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time);
00893 readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time);
00894 uint32_t chunk_connection_count = 0;
00895 readField(fields, COUNT_FIELD_NAME, true, &chunk_connection_count);
00896
00897 ROS_DEBUG("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
00898 (unsigned long long) chunk_info.pos, chunk_connection_count,
00899 chunk_info.start_time.sec, chunk_info.start_time.nsec,
00900 chunk_info.end_time.sec, chunk_info.end_time.nsec);
00901
00902
00903 for (uint32_t i = 0; i < chunk_connection_count; i ++) {
00904 uint32_t connection_id, connection_count;
00905 read((char*) &connection_id, 4);
00906 read((char*) &connection_count, 4);
00907
00908 ROS_DEBUG(" %d: %d messages", connection_id, connection_count);
00909
00910 chunk_info.connection_counts[connection_id] = connection_count;
00911 }
00912
00913 chunks_.push_back(chunk_info);
00914 }
00915
00916
00917
00918 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
00919 uint8_t op = 0xFF;
00920 readField(fields, OP_FIELD_NAME, true, &op);
00921 return op == reqOp;
00922 }
00923
00924 void Bag::writeHeader(M_string const& fields) {
00925 boost::shared_array<uint8_t> header_buffer;
00926 uint32_t header_len;
00927 ros::Header::write(fields, header_buffer, header_len);
00928 write((char*) &header_len, 4);
00929 write((char*) header_buffer.get(), header_len);
00930 }
00931
00932 void Bag::writeDataLength(uint32_t data_len) {
00933 write((char*) &data_len, 4);
00934 }
00935
00936 void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) {
00937 boost::shared_array<uint8_t> header_buffer;
00938 uint32_t header_len;
00939 ros::Header::write(fields, header_buffer, header_len);
00940
00941 uint32_t offset = buf.getSize();
00942
00943 buf.setSize(buf.getSize() + 4 + header_len);
00944
00945 memcpy(buf.getData() + offset, &header_len, 4);
00946 offset += 4;
00947 memcpy(buf.getData() + offset, header_buffer.get(), header_len);
00948 }
00949
00950 void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) {
00951 uint32_t offset = buf.getSize();
00952
00953 buf.setSize(buf.getSize() + 4);
00954
00955 memcpy(buf.getData() + offset, &data_len, 4);
00956 }
00957
00959 void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const {
00960 ROS_ASSERT(buffer.getSize() > 8);
00961
00962 uint8_t* start = (uint8_t*) buffer.getData() + offset;
00963
00964 uint8_t* ptr = start;
00965
00966
00967 uint32_t header_len;
00968 memcpy(&header_len, ptr, 4);
00969 ptr += 4;
00970
00971
00972 string error_msg;
00973 bool parsed = header.parse(ptr, header_len, error_msg);
00974 if (!parsed)
00975 throw BagFormatException("Error parsing header");
00976 ptr += header_len;
00977
00978
00979 memcpy(&data_size, ptr, 4);
00980 ptr += 4;
00981
00982 bytes_read = ptr - start;
00983 }
00984
00985 void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) const {
00986 total_bytes_read = 0;
00987 uint8_t op = 0xFF;
00988 do {
00989 ROS_DEBUG("reading header from buffer: offset=%d", offset);
00990 uint32_t bytes_read;
00991 readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
00992
00993 offset += bytes_read;
00994 total_bytes_read += bytes_read;
00995
00996 readField(*header.getValues(), OP_FIELD_NAME, true, &op);
00997 }
00998 while (op == OP_MSG_DEF || op == OP_CONNECTION);
00999
01000 if (op != OP_MSG_DATA)
01001 throw BagFormatException("Expected MSG_DATA op not found");
01002 }
01003
01004 bool Bag::readHeader(ros::Header& header) const {
01005
01006 uint32_t header_len;
01007 read((char*) &header_len, 4);
01008
01009
01010 header_buffer_.setSize(header_len);
01011 read((char*) header_buffer_.getData(), header_len);
01012
01013
01014 string error_msg;
01015 bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
01016 if (!parsed)
01017 return false;
01018
01019 return true;
01020 }
01021
01022 bool Bag::readDataLength(uint32_t& data_size) const {
01023 read((char*) &data_size, 4);
01024 return true;
01025 }
01026
01027 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
01028 M_string::const_iterator fitr = fields.find(field);
01029 if (fitr == fields.end()) {
01030 if (required)
01031 throw BagFormatException("Required '" + field + "' field missing");
01032 }
01033 else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
01034 throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
01035
01036 return fitr;
01037 }
01038
01039 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
01040 return readField(fields, field_name, 1, UINT_MAX, required, data);
01041 }
01042
01043 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
01044 M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
01045 if (fitr == fields.end())
01046 return false;
01047
01048 data = fitr->second;
01049 return true;
01050 }
01051
01052 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
01053 uint64_t packed_time;
01054 if (!readField(fields, field_name, required, &packed_time))
01055 return false;
01056
01057 uint64_t bitmask = (1LL << 33) - 1;
01058 data.sec = (uint32_t) (packed_time & bitmask);
01059 data.nsec = (uint32_t) (packed_time >> 32);
01060
01061 return true;
01062 }
01063
01064 std::string Bag::toHeaderString(Time const* field) const {
01065 uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
01066 return toHeaderString(&packed_time);
01067 }
01068
01069 ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size) {
01070 ros::AdvertiseOptions opts(c->topic, queue_size, c->md5sum, c->datatype, c->msg_def);
01071 ros::M_string::const_iterator header_iter = c->header->find("latching");
01072 opts.latch = (header_iter != c->header->end() && header_iter->second == "1");
01073 return opts;
01074 }
01075
01076
01077
01078 void Bag::write(string const& s) { write(s.c_str(), s.length()); }
01079 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n); }
01080
01081 void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); }
01082 void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); }
01083
01084 }