00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "rosbag/bag.h"
00029 #include "rosbag/message_instance.h"
00030 #include "rosbag/query.h"
00031 #include "rosbag/view.h"
00032
00033 #if defined(_MSC_VER)
00034 #include <stdint.h>
00035 #else
00036 #include <inttypes.h>
00037 #endif
00038 #include <signal.h>
00039 #include <assert.h>
00040 #include <iomanip>
00041
00042 #include <boost/foreach.hpp>
00043
00044 #include "console_bridge/console.h"
00045
00046 #define foreach BOOST_FOREACH
00047
00048 using std::map;
00049 using std::priority_queue;
00050 using std::string;
00051 using std::vector;
00052 using std::multiset;
00053 using boost::format;
00054 using boost::shared_ptr;
00055 using ros::M_string;
00056 using ros::Time;
00057
00058 namespace rosbag {
00059
00060 Bag::Bag() :
00061 compression_(compression::Uncompressed),
00062 chunk_threshold_(768 * 1024),
00063 bag_revision_(0),
00064 file_size_(0),
00065 file_header_pos_(0),
00066 index_data_pos_(0),
00067 connection_count_(0),
00068 chunk_count_(0),
00069 chunk_open_(false),
00070 curr_chunk_data_pos_(0),
00071 decompressed_chunk_(0)
00072 {
00073 }
00074
00075 Bag::Bag(string const& filename, uint32_t mode) :
00076 compression_(compression::Uncompressed),
00077 chunk_threshold_(768 * 1024),
00078 bag_revision_(0),
00079 file_size_(0),
00080 file_header_pos_(0),
00081 index_data_pos_(0),
00082 connection_count_(0),
00083 chunk_count_(0),
00084 chunk_open_(false),
00085 curr_chunk_data_pos_(0),
00086 decompressed_chunk_(0)
00087 {
00088 open(filename, mode);
00089 }
00090
00091 Bag::~Bag() {
00092 close();
00093 }
00094
00095 void Bag::open(string const& filename, uint32_t mode) {
00096 mode_ = (BagMode) mode;
00097
00098 if (mode_ & bagmode::Append)
00099 openAppend(filename);
00100 else if (mode_ & bagmode::Write)
00101 openWrite(filename);
00102 else if (mode_ & bagmode::Read)
00103 openRead(filename);
00104 else
00105 throw BagException((format("Unknown mode: %1%") % (int) mode).str());
00106
00107
00108 uint64_t offset = file_.getOffset();
00109 seek(0, std::ios::end);
00110 file_size_ = file_.getOffset();
00111 seek(offset);
00112 }
00113
00114 void Bag::openRead(string const& filename) {
00115 file_.openRead(filename);
00116
00117 readVersion();
00118
00119 switch (version_) {
00120 case 102: startReadingVersion102(); break;
00121 case 200: startReadingVersion200(); break;
00122 default:
00123 throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str());
00124 }
00125 }
00126
00127 void Bag::openWrite(string const& filename) {
00128 file_.openWrite(filename);
00129
00130 startWriting();
00131 }
00132
00133 void Bag::openAppend(string const& filename) {
00134 file_.openReadWrite(filename);
00135
00136 readVersion();
00137
00138 if (version_ != 200)
00139 throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str());
00140
00141 startReadingVersion200();
00142
00143
00144 file_.truncate(index_data_pos_);
00145 index_data_pos_ = 0;
00146
00147
00148 seek(file_header_pos_);
00149 writeFileHeaderRecord();
00150
00151
00152 seek(0, std::ios::end);
00153 }
00154
00155 void Bag::close() {
00156 if (!file_.isOpen())
00157 return;
00158
00159 if (mode_ & bagmode::Write || mode_ & bagmode::Append)
00160 closeWrite();
00161
00162 file_.close();
00163
00164 topic_connection_ids_.clear();
00165 header_connection_ids_.clear();
00166 for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
00167 delete i->second;
00168 connections_.clear();
00169 chunks_.clear();
00170 connection_indexes_.clear();
00171 curr_chunk_connection_indexes_.clear();
00172 }
00173
00174 void Bag::closeWrite() {
00175 stopWriting();
00176 }
00177
00178 string Bag::getFileName() const { return file_.getFileName(); }
00179 BagMode Bag::getMode() const { return mode_; }
00180 uint64_t Bag::getSize() const { return file_size_; }
00181
00182 uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; }
00183
00184 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
00185 if (file_.isOpen() && chunk_open_)
00186 stopWritingChunk();
00187
00188 chunk_threshold_ = chunk_threshold;
00189 }
00190
00191 CompressionType Bag::getCompression() const { return compression_; }
00192
00193 void Bag::setCompression(CompressionType compression) {
00194 if (file_.isOpen() && chunk_open_)
00195 stopWritingChunk();
00196
00197 compression_ = compression;
00198 }
00199
00200
00201
00202 void Bag::writeVersion() {
00203 string version = string("#ROSBAG V") + VERSION + string("\n");
00204
00205 logDebug("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
00206
00207 version_ = 200;
00208
00209 write(version);
00210 }
00211
00212 void Bag::readVersion() {
00213 string version_line = file_.getline();
00214
00215 file_header_pos_ = file_.getOffset();
00216
00217 char logtypename[100];
00218 int version_major, version_minor;
00219 #if defined(_MSC_VER)
00220 if (sscanf_s(version_line.c_str(), "#ROS%s V%d.%d", logtypename, sizeof(logtypename), &version_major, &version_minor) != 3)
00221 #else
00222 if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3)
00223 #endif
00224 throw BagIOException("Error reading version line");
00225
00226 version_ = version_major * 100 + version_minor;
00227
00228 logDebug("Read VERSION: version=%d", version_);
00229 }
00230
00231 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
00232 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
00233
00234
00235
00236 void Bag::startWriting() {
00237 writeVersion();
00238 file_header_pos_ = file_.getOffset();
00239 writeFileHeaderRecord();
00240 }
00241
00242 void Bag::stopWriting() {
00243 if (chunk_open_)
00244 stopWritingChunk();
00245
00246 seek(0, std::ios::end);
00247
00248 index_data_pos_ = file_.getOffset();
00249 writeConnectionRecords();
00250 writeChunkInfoRecords();
00251
00252 seek(file_header_pos_);
00253 writeFileHeaderRecord();
00254 }
00255
00256 void Bag::startReadingVersion200() {
00257
00258 readFileHeaderRecord();
00259
00260
00261 seek(index_data_pos_);
00262
00263
00264 for (uint32_t i = 0; i < connection_count_; i++)
00265 readConnectionRecord();
00266
00267
00268 for (uint32_t i = 0; i < chunk_count_; i++)
00269 readChunkInfoRecord();
00270
00271
00272 foreach(ChunkInfo const& chunk_info, chunks_) {
00273 curr_chunk_info_ = chunk_info;
00274
00275 seek(curr_chunk_info_.pos);
00276
00277
00278 ChunkHeader chunk_header;
00279 readChunkHeader(chunk_header);
00280 seek(chunk_header.compressed_size, std::ios::cur);
00281
00282
00283 for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
00284 readConnectionIndexRecord200();
00285 }
00286
00287
00288 curr_chunk_info_ = ChunkInfo();
00289 }
00290
00291 void Bag::startReadingVersion102() {
00292 try
00293 {
00294
00295 readFileHeaderRecord();
00296 }
00297 catch (BagFormatException ex) {
00298 throw BagUnindexedException();
00299 }
00300
00301
00302 seek(0, std::ios::end);
00303 uint64_t filelength = file_.getOffset();
00304
00305
00306 seek(index_data_pos_);
00307
00308
00309 while (file_.getOffset() < filelength)
00310 readTopicIndexRecord102();
00311
00312
00313 for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
00314 multiset<IndexEntry> const& index = i->second;
00315 IndexEntry const& first_entry = *index.begin();
00316
00317 logDebug("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
00318
00319 seek(first_entry.chunk_pos);
00320
00321 readMessageDefinitionRecord102();
00322 }
00323 }
00324
00325
00326
00327 void Bag::writeFileHeaderRecord() {
00328 connection_count_ = connections_.size();
00329 chunk_count_ = chunks_.size();
00330
00331 logDebug("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
00332 (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00333
00334
00335 M_string header;
00336 header[OP_FIELD_NAME] = toHeaderString(&OP_FILE_HEADER);
00337 header[INDEX_POS_FIELD_NAME] = toHeaderString(&index_data_pos_);
00338 header[CONNECTION_COUNT_FIELD_NAME] = toHeaderString(&connection_count_);
00339 header[CHUNK_COUNT_FIELD_NAME] = toHeaderString(&chunk_count_);
00340
00341 boost::shared_array<uint8_t> header_buffer;
00342 uint32_t header_len;
00343 ros::Header::write(header, header_buffer, header_len);
00344 uint32_t data_len = 0;
00345 if (header_len < FILE_HEADER_LENGTH)
00346 data_len = FILE_HEADER_LENGTH - header_len;
00347 write((char*) &header_len, 4);
00348 write((char*) header_buffer.get(), header_len);
00349 write((char*) &data_len, 4);
00350
00351
00352 if (data_len > 0) {
00353 string padding;
00354 padding.resize(data_len, ' ');
00355 write(padding);
00356 }
00357 }
00358
00359 void Bag::readFileHeaderRecord() {
00360 ros::Header header;
00361 uint32_t data_size;
00362 if (!readHeader(header) || !readDataLength(data_size))
00363 throw BagFormatException("Error reading FILE_HEADER record");
00364
00365 M_string& fields = *header.getValues();
00366
00367 if (!isOp(fields, OP_FILE_HEADER))
00368 throw BagFormatException("Expected FILE_HEADER op not found");
00369
00370
00371 readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
00372
00373 if (index_data_pos_ == 0)
00374 throw BagUnindexedException();
00375
00376
00377 if (version_ >= 200) {
00378 readField(fields, CONNECTION_COUNT_FIELD_NAME, true, &connection_count_);
00379 readField(fields, CHUNK_COUNT_FIELD_NAME, true, &chunk_count_);
00380 }
00381
00382 logDebug("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
00383 (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00384
00385
00386 seek(data_size, std::ios::cur);
00387 }
00388
00389 uint32_t Bag::getChunkOffset() const {
00390 if (compression_ == compression::Uncompressed)
00391 return file_.getOffset() - curr_chunk_data_pos_;
00392 else
00393 return file_.getCompressedBytesIn();
00394 }
00395
00396 void Bag::startWritingChunk(Time time) {
00397
00398 curr_chunk_info_.pos = file_.getOffset();
00399 curr_chunk_info_.start_time = time;
00400 curr_chunk_info_.end_time = time;
00401
00402
00403 writeChunkHeader(compression_, 0, 0);
00404
00405
00406 file_.setWriteMode(compression_);
00407
00408
00409 curr_chunk_data_pos_ = file_.getOffset();
00410
00411 chunk_open_ = true;
00412 }
00413
00414 void Bag::stopWritingChunk() {
00415
00416 chunks_.push_back(curr_chunk_info_);
00417
00418
00419 uint32_t uncompressed_size = getChunkOffset();
00420 file_.setWriteMode(compression::Uncompressed);
00421 uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_;
00422
00423
00424 uint64_t end_of_chunk_pos = file_.getOffset();
00425
00426 seek(curr_chunk_info_.pos);
00427 writeChunkHeader(compression_, compressed_size, uncompressed_size);
00428
00429
00430 seek(end_of_chunk_pos);
00431 writeIndexRecords();
00432 curr_chunk_connection_indexes_.clear();
00433
00434
00435 curr_chunk_info_.connection_counts.clear();
00436
00437
00438 chunk_open_ = false;
00439 }
00440
00441 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
00442 ChunkHeader chunk_header;
00443 switch (compression) {
00444 case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
00445 case compression::BZ2: chunk_header.compression = COMPRESSION_BZ2; break;
00446
00447 }
00448 chunk_header.compressed_size = compressed_size;
00449 chunk_header.uncompressed_size = uncompressed_size;
00450
00451 logDebug("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
00452 (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
00453
00454 M_string header;
00455 header[OP_FIELD_NAME] = toHeaderString(&OP_CHUNK);
00456 header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
00457 header[SIZE_FIELD_NAME] = toHeaderString(&chunk_header.uncompressed_size);
00458 writeHeader(header);
00459
00460 writeDataLength(chunk_header.compressed_size);
00461 }
00462
00463 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
00464 ros::Header header;
00465 if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
00466 throw BagFormatException("Error reading CHUNK record");
00467
00468 M_string& fields = *header.getValues();
00469
00470 if (!isOp(fields, OP_CHUNK))
00471 throw BagFormatException("Expected CHUNK op not found");
00472
00473 readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
00474 readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size);
00475
00476 logDebug("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
00477 }
00478
00479
00480
00481 void Bag::writeIndexRecords() {
00482 for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
00483 uint32_t connection_id = i->first;
00484 multiset<IndexEntry> const& index = i->second;
00485
00486
00487 uint32_t index_size = index.size();
00488 M_string header;
00489 header[OP_FIELD_NAME] = toHeaderString(&OP_INDEX_DATA);
00490 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
00491 header[VER_FIELD_NAME] = toHeaderString(&INDEX_VERSION);
00492 header[COUNT_FIELD_NAME] = toHeaderString(&index_size);
00493 writeHeader(header);
00494
00495 writeDataLength(index_size * 12);
00496
00497 logDebug("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
00498
00499
00500 foreach(IndexEntry const& e, index) {
00501 write((char*) &e.time.sec, 4);
00502 write((char*) &e.time.nsec, 4);
00503 write((char*) &e.offset, 4);
00504
00505 logDebug(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
00506 }
00507 }
00508 }
00509
00510 void Bag::readTopicIndexRecord102() {
00511 ros::Header header;
00512 uint32_t data_size;
00513 if (!readHeader(header) || !readDataLength(data_size))
00514 throw BagFormatException("Error reading INDEX_DATA header");
00515 M_string& fields = *header.getValues();
00516
00517 if (!isOp(fields, OP_INDEX_DATA))
00518 throw BagFormatException("Expected INDEX_DATA record");
00519
00520 uint32_t index_version;
00521 string topic;
00522 uint32_t count = 0;
00523 readField(fields, VER_FIELD_NAME, true, &index_version);
00524 readField(fields, TOPIC_FIELD_NAME, true, topic);
00525 readField(fields, COUNT_FIELD_NAME, true, &count);
00526
00527 logDebug("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
00528
00529 if (index_version != 0)
00530 throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00531
00532 uint32_t connection_id;
00533 map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00534 if (topic_conn_id_iter == topic_connection_ids_.end()) {
00535 connection_id = connections_.size();
00536
00537 logDebug("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
00538 ConnectionInfo* connection_info = new ConnectionInfo();
00539 connection_info->id = connection_id;
00540 connection_info->topic = topic;
00541 connections_[connection_id] = connection_info;
00542
00543 topic_connection_ids_[topic] = connection_id;
00544 }
00545 else
00546 connection_id = topic_conn_id_iter->second;
00547
00548 multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00549
00550 for (uint32_t i = 0; i < count; i++) {
00551 IndexEntry index_entry;
00552 uint32_t sec;
00553 uint32_t nsec;
00554 read((char*) &sec, 4);
00555 read((char*) &nsec, 4);
00556 read((char*) &index_entry.chunk_pos, 8);
00557 index_entry.time = Time(sec, nsec);
00558 index_entry.offset = 0;
00559
00560 logDebug(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
00561
00562 if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00563 {
00564 logError("Index entry for topic %s contains invalid time.", topic.c_str());
00565 } else
00566 {
00567 connection_index.insert(connection_index.end(), index_entry);
00568 }
00569 }
00570 }
00571
00572 void Bag::readConnectionIndexRecord200() {
00573 ros::Header header;
00574 uint32_t data_size;
00575 if (!readHeader(header) || !readDataLength(data_size))
00576 throw BagFormatException("Error reading INDEX_DATA header");
00577 M_string& fields = *header.getValues();
00578
00579 if (!isOp(fields, OP_INDEX_DATA))
00580 throw BagFormatException("Expected INDEX_DATA record");
00581
00582 uint32_t index_version;
00583 uint32_t connection_id;
00584 uint32_t count = 0;
00585 readField(fields, VER_FIELD_NAME, true, &index_version);
00586 readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
00587 readField(fields, COUNT_FIELD_NAME, true, &count);
00588
00589 logDebug("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
00590
00591 if (index_version != 1)
00592 throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00593
00594 uint64_t chunk_pos = curr_chunk_info_.pos;
00595
00596 multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00597
00598 for (uint32_t i = 0; i < count; i++) {
00599 IndexEntry index_entry;
00600 index_entry.chunk_pos = chunk_pos;
00601 uint32_t sec;
00602 uint32_t nsec;
00603 read((char*) &sec, 4);
00604 read((char*) &nsec, 4);
00605 read((char*) &index_entry.offset, 4);
00606 index_entry.time = Time(sec, nsec);
00607
00608 logDebug(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
00609
00610 if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00611 {
00612 logError("Index entry for topic %s contains invalid time. This message will not be loaded.", connections_[connection_id]->topic.c_str());
00613 } else
00614 {
00615 connection_index.insert(connection_index.end(), index_entry);
00616 }
00617 }
00618 }
00619
00620
00621
00622 void Bag::writeConnectionRecords() {
00623 for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
00624 ConnectionInfo const* connection_info = i->second;
00625 writeConnectionRecord(connection_info);
00626 }
00627 }
00628
00629 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) {
00630 logDebug("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
00631 (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
00632
00633 M_string header;
00634 header[OP_FIELD_NAME] = toHeaderString(&OP_CONNECTION);
00635 header[TOPIC_FIELD_NAME] = connection_info->topic;
00636 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00637 writeHeader(header);
00638
00639 writeHeader(*connection_info->header);
00640 }
00641
00642 void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) {
00643 M_string header;
00644 header[OP_FIELD_NAME] = toHeaderString(&OP_CONNECTION);
00645 header[TOPIC_FIELD_NAME] = connection_info->topic;
00646 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00647 appendHeaderToBuffer(buf, header);
00648
00649 appendHeaderToBuffer(buf, *connection_info->header);
00650 }
00651
00652 void Bag::readConnectionRecord() {
00653 ros::Header header;
00654 if (!readHeader(header))
00655 throw BagFormatException("Error reading CONNECTION header");
00656 M_string& fields = *header.getValues();
00657
00658 if (!isOp(fields, OP_CONNECTION))
00659 throw BagFormatException("Expected CONNECTION op not found");
00660
00661 uint32_t id;
00662 readField(fields, CONNECTION_FIELD_NAME, true, &id);
00663 string topic;
00664 readField(fields, TOPIC_FIELD_NAME, true, topic);
00665
00666 ros::Header connection_header;
00667 if (!readHeader(connection_header))
00668 throw BagFormatException("Error reading connection header");
00669
00670
00671 map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
00672 if (key == connections_.end()) {
00673 ConnectionInfo* connection_info = new ConnectionInfo();
00674 connection_info->id = id;
00675 connection_info->topic = topic;
00676 connection_info->header = shared_ptr<M_string>(new M_string);
00677 for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
00678 (*connection_info->header)[i->first] = i->second;
00679 connection_info->msg_def = (*connection_info->header)["message_definition"];
00680 connection_info->datatype = (*connection_info->header)["type"];
00681 connection_info->md5sum = (*connection_info->header)["md5sum"];
00682 connections_[id] = connection_info;
00683
00684 logDebug("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
00685 }
00686 }
00687
00688 void Bag::readMessageDefinitionRecord102() {
00689 ros::Header header;
00690 uint32_t data_size;
00691 if (!readHeader(header) || !readDataLength(data_size))
00692 throw BagFormatException("Error reading message definition header");
00693 M_string& fields = *header.getValues();
00694
00695 if (!isOp(fields, OP_MSG_DEF))
00696 throw BagFormatException("Expected MSG_DEF op not found");
00697
00698 string topic, md5sum, datatype, message_definition;
00699 readField(fields, TOPIC_FIELD_NAME, true, topic);
00700 readField(fields, MD5_FIELD_NAME, 32, 32, true, md5sum);
00701 readField(fields, TYPE_FIELD_NAME, true, datatype);
00702 readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition);
00703
00704 ConnectionInfo* connection_info;
00705
00706 map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00707 if (topic_conn_id_iter == topic_connection_ids_.end()) {
00708 uint32_t id = connections_.size();
00709
00710 logDebug("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00711 connection_info = new ConnectionInfo();
00712 connection_info->id = id;
00713 connection_info->topic = topic;
00714
00715 connections_[id] = connection_info;
00716 topic_connection_ids_[topic] = id;
00717 }
00718 else
00719 connection_info = connections_[topic_conn_id_iter->second];
00720
00721 connection_info->msg_def = message_definition;
00722 connection_info->datatype = datatype;
00723 connection_info->md5sum = md5sum;
00724 connection_info->header = boost::shared_ptr<ros::M_string>(new ros::M_string);
00725 (*connection_info->header)["type"] = connection_info->datatype;
00726 (*connection_info->header)["md5sum"] = connection_info->md5sum;
00727 (*connection_info->header)["message_definition"] = connection_info->msg_def;
00728
00729 logDebug("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00730 }
00731
00732 void Bag::decompressChunk(uint64_t chunk_pos) const {
00733 if (curr_chunk_info_.pos == chunk_pos) {
00734 current_buffer_ = &outgoing_chunk_buffer_;
00735 return;
00736 }
00737
00738 current_buffer_ = &decompress_buffer_;
00739
00740 if (decompressed_chunk_ == chunk_pos)
00741 return;
00742
00743
00744 seek(chunk_pos);
00745
00746
00747 ChunkHeader chunk_header;
00748 readChunkHeader(chunk_header);
00749
00750
00751 if (chunk_header.compression == COMPRESSION_NONE)
00752 decompressRawChunk(chunk_header);
00753 else if (chunk_header.compression == COMPRESSION_BZ2)
00754 decompressBz2Chunk(chunk_header);
00755 else
00756 throw BagFormatException("Unknown compression: " + chunk_header.compression);
00757
00758 decompressed_chunk_ = chunk_pos;
00759 }
00760
00761 void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const {
00762 logDebug("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
00763
00764 seek(offset);
00765
00766 uint32_t data_size;
00767 uint8_t op;
00768 do {
00769 if (!readHeader(header) || !readDataLength(data_size))
00770 throw BagFormatException("Error reading header");
00771
00772 readField(*header.getValues(), OP_FIELD_NAME, true, &op);
00773 }
00774 while (op == OP_MSG_DEF);
00775
00776 if (op != OP_MSG_DATA)
00777 throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str());
00778
00779 record_buffer_.setSize(data_size);
00780 file_.read((char*) record_buffer_.getData(), data_size);
00781 }
00782
00783
00784 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
00785 assert(chunk_header.compression == COMPRESSION_NONE);
00786 assert(chunk_header.compressed_size == chunk_header.uncompressed_size);
00787
00788 logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00789
00790 decompress_buffer_.setSize(chunk_header.compressed_size);
00791 file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size);
00792
00793
00794 }
00795
00796 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
00797 assert(chunk_header.compression == COMPRESSION_BZ2);
00798
00799 CompressionType compression = compression::BZ2;
00800
00801 logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00802
00803 chunk_buffer_.setSize(chunk_header.compressed_size);
00804 file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
00805
00806 decompress_buffer_.setSize(chunk_header.uncompressed_size);
00807 file_.decompress(compression, decompress_buffer_.getData(), decompress_buffer_.getSize(), chunk_buffer_.getData(), chunk_buffer_.getSize());
00808
00809
00810 }
00811
00812 ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) {
00813 ros::Header header;
00814 uint32_t data_size;
00815 uint32_t bytes_read;
00816 switch (version_)
00817 {
00818 case 200:
00819 decompressChunk(index_entry.chunk_pos);
00820 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00821 return header;
00822 case 102:
00823 readMessageDataRecord102(index_entry.chunk_pos, header);
00824 return header;
00825 default:
00826 throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00827 }
00828 }
00829
00830
00831 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
00832 ros::Header header;
00833 uint32_t data_size;
00834 uint32_t bytes_read;
00835 switch (version_)
00836 {
00837 case 200:
00838 decompressChunk(index_entry.chunk_pos);
00839 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00840 return data_size;
00841 case 102:
00842 readMessageDataRecord102(index_entry.chunk_pos, header);
00843 return record_buffer_.getSize();
00844 default:
00845 throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00846 }
00847 }
00848
00849 void Bag::writeChunkInfoRecords() {
00850 foreach(ChunkInfo const& chunk_info, chunks_) {
00851
00852 M_string header;
00853 uint32_t chunk_connection_count = chunk_info.connection_counts.size();
00854 header[OP_FIELD_NAME] = toHeaderString(&OP_CHUNK_INFO);
00855 header[VER_FIELD_NAME] = toHeaderString(&CHUNK_INFO_VERSION);
00856 header[CHUNK_POS_FIELD_NAME] = toHeaderString(&chunk_info.pos);
00857 header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
00858 header[END_TIME_FIELD_NAME] = toHeaderString(&chunk_info.end_time);
00859 header[COUNT_FIELD_NAME] = toHeaderString(&chunk_connection_count);
00860
00861 logDebug("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
00862 (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
00863 chunk_info.start_time.sec, chunk_info.start_time.nsec,
00864 chunk_info.end_time.sec, chunk_info.end_time.nsec);
00865
00866 writeHeader(header);
00867
00868 writeDataLength(8 * chunk_connection_count);
00869
00870
00871 for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
00872 uint32_t connection_id = i->first;
00873 uint32_t count = i->second;
00874
00875 write((char*) &connection_id, 4);
00876 write((char*) &count, 4);
00877
00878 logDebug(" - %d: %d", connection_id, count);
00879 }
00880 }
00881 }
00882
00883 void Bag::readChunkInfoRecord() {
00884
00885 ros::Header header;
00886 uint32_t data_size;
00887 if (!readHeader(header) || !readDataLength(data_size))
00888 throw BagFormatException("Error reading CHUNK_INFO record header");
00889 M_string& fields = *header.getValues();
00890 if (!isOp(fields, OP_CHUNK_INFO))
00891 throw BagFormatException("Expected CHUNK_INFO op not found");
00892
00893
00894 uint32_t chunk_info_version;
00895 readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
00896 if (chunk_info_version != CHUNK_INFO_VERSION)
00897 throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str());
00898
00899
00900 ChunkInfo chunk_info;
00901 readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos);
00902 readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time);
00903 readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time);
00904 uint32_t chunk_connection_count = 0;
00905 readField(fields, COUNT_FIELD_NAME, true, &chunk_connection_count);
00906
00907 logDebug("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
00908 (unsigned long long) chunk_info.pos, chunk_connection_count,
00909 chunk_info.start_time.sec, chunk_info.start_time.nsec,
00910 chunk_info.end_time.sec, chunk_info.end_time.nsec);
00911
00912
00913 for (uint32_t i = 0; i < chunk_connection_count; i ++) {
00914 uint32_t connection_id, connection_count;
00915 read((char*) &connection_id, 4);
00916 read((char*) &connection_count, 4);
00917
00918 logDebug(" %d: %d messages", connection_id, connection_count);
00919
00920 chunk_info.connection_counts[connection_id] = connection_count;
00921 }
00922
00923 chunks_.push_back(chunk_info);
00924 }
00925
00926
00927
00928 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
00929 uint8_t op = 0xFF;
00930 readField(fields, OP_FIELD_NAME, true, &op);
00931 return op == reqOp;
00932 }
00933
00934 void Bag::writeHeader(M_string const& fields) {
00935 boost::shared_array<uint8_t> header_buffer;
00936 uint32_t header_len;
00937 ros::Header::write(fields, header_buffer, header_len);
00938 write((char*) &header_len, 4);
00939 write((char*) header_buffer.get(), header_len);
00940 }
00941
00942 void Bag::writeDataLength(uint32_t data_len) {
00943 write((char*) &data_len, 4);
00944 }
00945
00946 void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) {
00947 boost::shared_array<uint8_t> header_buffer;
00948 uint32_t header_len;
00949 ros::Header::write(fields, header_buffer, header_len);
00950
00951 uint32_t offset = buf.getSize();
00952
00953 buf.setSize(buf.getSize() + 4 + header_len);
00954
00955 memcpy(buf.getData() + offset, &header_len, 4);
00956 offset += 4;
00957 memcpy(buf.getData() + offset, header_buffer.get(), header_len);
00958 }
00959
00960 void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) {
00961 uint32_t offset = buf.getSize();
00962
00963 buf.setSize(buf.getSize() + 4);
00964
00965 memcpy(buf.getData() + offset, &data_len, 4);
00966 }
00967
00969 void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const {
00970 assert(buffer.getSize() > 8);
00971
00972 uint8_t* start = (uint8_t*) buffer.getData() + offset;
00973
00974 uint8_t* ptr = start;
00975
00976
00977 uint32_t header_len;
00978 memcpy(&header_len, ptr, 4);
00979 ptr += 4;
00980
00981
00982 string error_msg;
00983 bool parsed = header.parse(ptr, header_len, error_msg);
00984 if (!parsed)
00985 throw BagFormatException("Error parsing header");
00986 ptr += header_len;
00987
00988
00989 memcpy(&data_size, ptr, 4);
00990 ptr += 4;
00991
00992 bytes_read = ptr - start;
00993 }
00994
00995 void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) const {
00996 total_bytes_read = 0;
00997 uint8_t op = 0xFF;
00998 do {
00999 logDebug("reading header from buffer: offset=%d", offset);
01000 uint32_t bytes_read;
01001 readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
01002
01003 offset += bytes_read;
01004 total_bytes_read += bytes_read;
01005
01006 readField(*header.getValues(), OP_FIELD_NAME, true, &op);
01007 }
01008 while (op == OP_MSG_DEF || op == OP_CONNECTION);
01009
01010 if (op != OP_MSG_DATA)
01011 throw BagFormatException("Expected MSG_DATA op not found");
01012 }
01013
01014 bool Bag::readHeader(ros::Header& header) const {
01015
01016 uint32_t header_len;
01017 read((char*) &header_len, 4);
01018
01019
01020 header_buffer_.setSize(header_len);
01021 read((char*) header_buffer_.getData(), header_len);
01022
01023
01024 string error_msg;
01025 bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
01026 if (!parsed)
01027 return false;
01028
01029 return true;
01030 }
01031
01032 bool Bag::readDataLength(uint32_t& data_size) const {
01033 read((char*) &data_size, 4);
01034 return true;
01035 }
01036
01037 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
01038 M_string::const_iterator fitr = fields.find(field);
01039 if (fitr == fields.end()) {
01040 if (required)
01041 throw BagFormatException("Required '" + field + "' field missing");
01042 }
01043 else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
01044 throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
01045
01046 return fitr;
01047 }
01048
01049 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
01050 return readField(fields, field_name, 1, UINT_MAX, required, data);
01051 }
01052
01053 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
01054 M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
01055 if (fitr == fields.end())
01056 return false;
01057
01058 data = fitr->second;
01059 return true;
01060 }
01061
01062 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
01063 uint64_t packed_time;
01064 if (!readField(fields, field_name, required, &packed_time))
01065 return false;
01066
01067 uint64_t bitmask = (1LL << 33) - 1;
01068 data.sec = (uint32_t) (packed_time & bitmask);
01069 data.nsec = (uint32_t) (packed_time >> 32);
01070
01071 return true;
01072 }
01073
01074 std::string Bag::toHeaderString(Time const* field) const {
01075 uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
01076 return toHeaderString(&packed_time);
01077 }
01078
01079
01080
01081
01082 void Bag::write(string const& s) { write(s.c_str(), s.length()); }
01083 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n); }
01084
01085 void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); }
01086 void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); }
01087
01088 }