$search
00001 // Copyright (c) 2009, Willow Garage, Inc. 00002 // All rights reserved. 00003 // 00004 // Redistribution and use in source and binary forms, with or without 00005 // modification, are permitted provided that the following conditions are met: 00006 // 00007 // * Redistributions of source code must retain the above copyright 00008 // notice, this list of conditions and the following disclaimer. 00009 // * Redistributions in binary form must reproduce the above copyright 00010 // notice, this list of conditions and the following disclaimer in the 00011 // documentation and/or other materials provided with the distribution. 00012 // * Neither the name of Willow Garage, Inc. nor the names of its 00013 // contributors may be used to endorse or promote products derived from 00014 // this software without specific prior written permission. 00015 // 00016 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00017 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00018 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00019 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00020 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00021 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00022 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00023 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00024 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00025 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00026 // POSSIBILITY OF SUCH DAMAGE. 00027 00028 #include "rosbag/bag.h" 00029 #include "rosbag/message_instance.h" 00030 #include "rosbag/query.h" 00031 #include "rosbag/view.h" 00032 00033 #include <inttypes.h> 00034 #include <signal.h> 00035 00036 #include <iomanip> 00037 00038 #include <boost/foreach.hpp> 00039 00040 #define foreach BOOST_FOREACH 00041 00042 using std::map; 00043 using std::priority_queue; 00044 using std::string; 00045 using std::vector; 00046 using std::multiset; 00047 using boost::format; 00048 using boost::shared_ptr; 00049 using ros::M_string; 00050 using ros::Time; 00051 00052 namespace rosbag { 00053 00054 Bag::Bag() : 00055 compression_(compression::Uncompressed), 00056 chunk_threshold_(768 * 1024), // 768KB chunks 00057 bag_revision_(0), 00058 file_size_(0), 00059 file_header_pos_(0), 00060 index_data_pos_(0), 00061 connection_count_(0), 00062 chunk_count_(0), 00063 chunk_open_(false), 00064 curr_chunk_data_pos_(0), 00065 decompressed_chunk_(0) 00066 { 00067 } 00068 00069 Bag::Bag(string const& filename, uint32_t mode) : 00070 compression_(compression::Uncompressed), 00071 chunk_threshold_(768 * 1024), // 768KB chunks 00072 bag_revision_(0), 00073 file_size_(0), 00074 file_header_pos_(0), 00075 index_data_pos_(0), 00076 connection_count_(0), 00077 chunk_count_(0), 00078 chunk_open_(false), 00079 curr_chunk_data_pos_(0), 00080 decompressed_chunk_(0) 00081 { 00082 open(filename, mode); 00083 } 00084 00085 Bag::~Bag() { 00086 close(); 00087 } 00088 00089 void Bag::open(string const& filename, uint32_t mode) { 00090 mode_ = (BagMode) mode; 00091 00092 if (mode_ & bagmode::Append) 00093 openAppend(filename); 00094 else if (mode_ & bagmode::Write) 00095 openWrite(filename); 00096 else if (mode_ & bagmode::Read) 00097 openRead(filename); 00098 else 00099 throw BagException((format("Unknown mode: %1%") % (int) mode).str()); 00100 00101 // Determine file size 00102 uint64_t offset = file_.getOffset(); 00103 seek(0, std::ios::end); 00104 file_size_ = file_.getOffset(); 00105 seek(offset); 00106 } 00107 00108 void Bag::openRead(string const& filename) { 00109 file_.openRead(filename); 00110 00111 readVersion(); 00112 00113 switch (version_) { 00114 case 102: startReadingVersion102(); break; 00115 case 200: startReadingVersion200(); break; 00116 default: 00117 throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str()); 00118 } 00119 } 00120 00121 void Bag::openWrite(string const& filename) { 00122 file_.openWrite(filename); 00123 00124 startWriting(); 00125 } 00126 00127 void Bag::openAppend(string const& filename) { 00128 file_.openReadWrite(filename); 00129 00130 readVersion(); 00131 00132 if (version_ != 200) 00133 throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str()); 00134 00135 startReadingVersion200(); 00136 00137 // Truncate the file to chop off the index 00138 file_.truncate(index_data_pos_); 00139 index_data_pos_ = 0; 00140 00141 // Rewrite the file header, clearing the index position (so we know if the index is invalid) 00142 seek(file_header_pos_); 00143 writeFileHeaderRecord(); 00144 00145 // Seek to the end of the file 00146 seek(0, std::ios::end); 00147 } 00148 00149 void Bag::close() { 00150 if (!file_.isOpen()) 00151 return; 00152 00153 if (mode_ & bagmode::Write || mode_ & bagmode::Append) 00154 closeWrite(); 00155 00156 file_.close(); 00157 00158 topic_connection_ids_.clear(); 00159 header_connection_ids_.clear(); 00160 for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++) 00161 delete i->second; 00162 connections_.clear(); 00163 chunks_.clear(); 00164 connection_indexes_.clear(); 00165 curr_chunk_connection_indexes_.clear(); 00166 } 00167 00168 void Bag::closeWrite() { 00169 stopWriting(); 00170 } 00171 00172 string Bag::getFileName() const { return file_.getFileName(); } 00173 BagMode Bag::getMode() const { return mode_; } 00174 uint64_t Bag::getSize() const { return file_size_; } 00175 00176 uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; } 00177 00178 void Bag::setChunkThreshold(uint32_t chunk_threshold) { 00179 if (file_.isOpen() && chunk_open_) 00180 stopWritingChunk(); 00181 00182 chunk_threshold_ = chunk_threshold; 00183 } 00184 00185 CompressionType Bag::getCompression() const { return compression_; } 00186 00187 void Bag::setCompression(CompressionType compression) { 00188 if (file_.isOpen() && chunk_open_) 00189 stopWritingChunk(); 00190 00191 compression_ = compression; 00192 } 00193 00194 // Version 00195 00196 void Bag::writeVersion() { 00197 string version = string("#ROSBAG V") + VERSION + string("\n"); 00198 00199 ROS_DEBUG("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str()); 00200 00201 version_ = 200; 00202 00203 write(version); 00204 } 00205 00206 void Bag::readVersion() { 00207 string version_line = file_.getline(); 00208 00209 file_header_pos_ = file_.getOffset(); 00210 00211 char logtypename[100]; 00212 int version_major, version_minor; 00213 if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3) 00214 throw BagIOException("Error reading version line"); 00215 00216 version_ = version_major * 100 + version_minor; 00217 00218 ROS_DEBUG("Read VERSION: version=%d", version_); 00219 } 00220 00221 uint32_t Bag::getMajorVersion() const { return version_ / 100; } 00222 uint32_t Bag::getMinorVersion() const { return version_ % 100; } 00223 00224 // 00225 00226 void Bag::startWriting() { 00227 writeVersion(); 00228 file_header_pos_ = file_.getOffset(); 00229 writeFileHeaderRecord(); 00230 } 00231 00232 void Bag::stopWriting() { 00233 if (chunk_open_) 00234 stopWritingChunk(); 00235 00236 seek(0, std::ios::end); 00237 00238 index_data_pos_ = file_.getOffset(); 00239 writeConnectionRecords(); 00240 writeChunkInfoRecords(); 00241 00242 seek(file_header_pos_); 00243 writeFileHeaderRecord(); 00244 } 00245 00246 void Bag::startReadingVersion200() { 00247 // Read the file header record, which points to the end of the chunks 00248 readFileHeaderRecord(); 00249 00250 // Seek to the end of the chunks 00251 seek(index_data_pos_); 00252 00253 // Read the connection records (one for each connection) 00254 for (uint32_t i = 0; i < connection_count_; i++) 00255 readConnectionRecord(); 00256 00257 // Read the chunk info records 00258 for (uint32_t i = 0; i < chunk_count_; i++) 00259 readChunkInfoRecord(); 00260 00261 // Read the connection indexes for each chunk 00262 foreach(ChunkInfo const& chunk_info, chunks_) { 00263 curr_chunk_info_ = chunk_info; 00264 00265 seek(curr_chunk_info_.pos); 00266 00267 // Skip over the chunk data 00268 ChunkHeader chunk_header; 00269 readChunkHeader(chunk_header); 00270 seek(chunk_header.compressed_size, std::ios::cur); 00271 00272 // Read the index records after the chunk 00273 for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++) 00274 readConnectionIndexRecord200(); 00275 } 00276 00277 // At this point we don't have a curr_chunk_info anymore so we reset it 00278 curr_chunk_info_ = ChunkInfo(); 00279 } 00280 00281 void Bag::startReadingVersion102() { 00282 try 00283 { 00284 // Read the file header record, which points to the start of the topic indexes 00285 readFileHeaderRecord(); 00286 } 00287 catch (BagFormatException ex) { 00288 throw BagUnindexedException(); 00289 } 00290 00291 // Get the length of the file 00292 seek(0, std::ios::end); 00293 uint64_t filelength = file_.getOffset(); 00294 00295 // Seek to the beginning of the topic index records 00296 seek(index_data_pos_); 00297 00298 // Read the topic index records, which point to the offsets of each message in the file 00299 while (file_.getOffset() < filelength) 00300 readTopicIndexRecord102(); 00301 00302 // Read the message definition records (which are the first entry in the topic indexes) 00303 for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) { 00304 multiset<IndexEntry> const& index = i->second; 00305 IndexEntry const& first_entry = *index.begin(); 00306 00307 ROS_DEBUG("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos); 00308 00309 seek(first_entry.chunk_pos); 00310 00311 readMessageDefinitionRecord102(); 00312 } 00313 } 00314 00315 // File header record 00316 00317 void Bag::writeFileHeaderRecord() { 00318 connection_count_ = connections_.size(); 00319 chunk_count_ = chunks_.size(); 00320 00321 ROS_DEBUG("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d", 00322 (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_); 00323 00324 // Write file header record 00325 M_string header; 00326 header[OP_FIELD_NAME] = toHeaderString(&OP_FILE_HEADER); 00327 header[INDEX_POS_FIELD_NAME] = toHeaderString(&index_data_pos_); 00328 header[CONNECTION_COUNT_FIELD_NAME] = toHeaderString(&connection_count_); 00329 header[CHUNK_COUNT_FIELD_NAME] = toHeaderString(&chunk_count_); 00330 00331 boost::shared_array<uint8_t> header_buffer; 00332 uint32_t header_len; 00333 ros::Header::write(header, header_buffer, header_len); 00334 uint32_t data_len = 0; 00335 if (header_len < FILE_HEADER_LENGTH) 00336 data_len = FILE_HEADER_LENGTH - header_len; 00337 write((char*) &header_len, 4); 00338 write((char*) header_buffer.get(), header_len); 00339 write((char*) &data_len, 4); 00340 00341 // Pad the file header record out 00342 if (data_len > 0) { 00343 string padding; 00344 padding.resize(data_len, ' '); 00345 write(padding); 00346 } 00347 } 00348 00349 void Bag::readFileHeaderRecord() { 00350 ros::Header header; 00351 uint32_t data_size; 00352 if (!readHeader(header) || !readDataLength(data_size)) 00353 throw BagFormatException("Error reading FILE_HEADER record"); 00354 00355 M_string& fields = *header.getValues(); 00356 00357 if (!isOp(fields, OP_FILE_HEADER)) 00358 throw BagFormatException("Expected FILE_HEADER op not found"); 00359 00360 // Read index position 00361 readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_); 00362 00363 if (index_data_pos_ == 0) 00364 throw BagUnindexedException(); 00365 00366 // Read topic and chunks count 00367 if (version_ >= 200) { 00368 readField(fields, CONNECTION_COUNT_FIELD_NAME, true, &connection_count_); 00369 readField(fields, CHUNK_COUNT_FIELD_NAME, true, &chunk_count_); 00370 } 00371 00372 ROS_DEBUG("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d", 00373 (unsigned long long) index_data_pos_, connection_count_, chunk_count_); 00374 00375 // Skip the data section (just padding) 00376 seek(data_size, std::ios::cur); 00377 } 00378 00379 uint32_t Bag::getChunkOffset() const { 00380 if (compression_ == compression::Uncompressed) 00381 return file_.getOffset() - curr_chunk_data_pos_; 00382 else 00383 return file_.getCompressedBytesIn(); 00384 } 00385 00386 void Bag::startWritingChunk(Time time) { 00387 // Initialize chunk info 00388 curr_chunk_info_.pos = file_.getOffset(); 00389 curr_chunk_info_.start_time = time; 00390 curr_chunk_info_.end_time = time; 00391 00392 // Write the chunk header, with a place-holder for the data sizes (we'll fill in when the chunk is finished) 00393 writeChunkHeader(compression_, 0, 0); 00394 00395 // Turn on compressed writing 00396 file_.setWriteMode(compression_); 00397 00398 // Record where the data section of this chunk started 00399 curr_chunk_data_pos_ = file_.getOffset(); 00400 00401 chunk_open_ = true; 00402 } 00403 00404 void Bag::stopWritingChunk() { 00405 // Add this chunk to the index 00406 chunks_.push_back(curr_chunk_info_); 00407 00408 // Get the uncompressed and compressed sizes 00409 uint32_t uncompressed_size = getChunkOffset(); 00410 file_.setWriteMode(compression::Uncompressed); 00411 uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_; 00412 00413 // Rewrite the chunk header with the size of the chunk (remembering current offset) 00414 uint64_t end_of_chunk_pos = file_.getOffset(); 00415 00416 seek(curr_chunk_info_.pos); 00417 writeChunkHeader(compression_, compressed_size, uncompressed_size); 00418 00419 // Write out the indexes and clear them 00420 seek(end_of_chunk_pos); 00421 writeIndexRecords(); 00422 curr_chunk_connection_indexes_.clear(); 00423 00424 // Clear the connection counts 00425 curr_chunk_info_.connection_counts.clear(); 00426 00427 // Flag that we're starting a new chunk 00428 chunk_open_ = false; 00429 } 00430 00431 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) { 00432 ChunkHeader chunk_header; 00433 switch (compression) { 00434 case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break; 00435 case compression::BZ2: chunk_header.compression = COMPRESSION_BZ2; break; 00436 //case compression::ZLIB: chunk_header.compression = COMPRESSION_ZLIB; break; 00437 } 00438 chunk_header.compressed_size = compressed_size; 00439 chunk_header.uncompressed_size = uncompressed_size; 00440 00441 ROS_DEBUG("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d", 00442 (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size); 00443 00444 M_string header; 00445 header[OP_FIELD_NAME] = toHeaderString(&OP_CHUNK); 00446 header[COMPRESSION_FIELD_NAME] = chunk_header.compression; 00447 header[SIZE_FIELD_NAME] = toHeaderString(&chunk_header.uncompressed_size); 00448 writeHeader(header); 00449 00450 writeDataLength(chunk_header.compressed_size); 00451 } 00452 00453 void Bag::readChunkHeader(ChunkHeader& chunk_header) const { 00454 ros::Header header; 00455 if (!readHeader(header) || !readDataLength(chunk_header.compressed_size)) 00456 throw BagFormatException("Error reading CHUNK record"); 00457 00458 M_string& fields = *header.getValues(); 00459 00460 if (!isOp(fields, OP_CHUNK)) 00461 throw BagFormatException("Expected CHUNK op not found"); 00462 00463 readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression); 00464 readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size); 00465 00466 ROS_DEBUG("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size); 00467 } 00468 00469 // Index records 00470 00471 void Bag::writeIndexRecords() { 00472 for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) { 00473 uint32_t connection_id = i->first; 00474 multiset<IndexEntry> const& index = i->second; 00475 00476 // Write the index record header 00477 uint32_t index_size = index.size(); 00478 M_string header; 00479 header[OP_FIELD_NAME] = toHeaderString(&OP_INDEX_DATA); 00480 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id); 00481 header[VER_FIELD_NAME] = toHeaderString(&INDEX_VERSION); 00482 header[COUNT_FIELD_NAME] = toHeaderString(&index_size); 00483 writeHeader(header); 00484 00485 writeDataLength(index_size * 12); 00486 00487 ROS_DEBUG("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size); 00488 00489 // Write the index record data (pairs of timestamp and position in file) 00490 foreach(IndexEntry const& e, index) { 00491 write((char*) &e.time.sec, 4); 00492 write((char*) &e.time.nsec, 4); 00493 write((char*) &e.offset, 4); 00494 00495 ROS_DEBUG(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset); 00496 } 00497 } 00498 } 00499 00500 void Bag::readTopicIndexRecord102() { 00501 ros::Header header; 00502 uint32_t data_size; 00503 if (!readHeader(header) || !readDataLength(data_size)) 00504 throw BagFormatException("Error reading INDEX_DATA header"); 00505 M_string& fields = *header.getValues(); 00506 00507 if (!isOp(fields, OP_INDEX_DATA)) 00508 throw BagFormatException("Expected INDEX_DATA record"); 00509 00510 uint32_t index_version; 00511 string topic; 00512 uint32_t count = 0; 00513 readField(fields, VER_FIELD_NAME, true, &index_version); 00514 readField(fields, TOPIC_FIELD_NAME, true, topic); 00515 readField(fields, COUNT_FIELD_NAME, true, &count); 00516 00517 ROS_DEBUG("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count); 00518 00519 if (index_version != 0) 00520 throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str()); 00521 00522 uint32_t connection_id; 00523 map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic); 00524 if (topic_conn_id_iter == topic_connection_ids_.end()) { 00525 connection_id = connections_.size(); 00526 00527 ROS_DEBUG("Creating connection: id=%d topic=%s", connection_id, topic.c_str()); 00528 ConnectionInfo* connection_info = new ConnectionInfo(); 00529 connection_info->id = connection_id; 00530 connection_info->topic = topic; 00531 connections_[connection_id] = connection_info; 00532 00533 topic_connection_ids_[topic] = connection_id; 00534 } 00535 else 00536 connection_id = topic_conn_id_iter->second; 00537 00538 multiset<IndexEntry>& connection_index = connection_indexes_[connection_id]; 00539 00540 for (uint32_t i = 0; i < count; i++) { 00541 IndexEntry index_entry; 00542 uint32_t sec; 00543 uint32_t nsec; 00544 read((char*) &sec, 4); 00545 read((char*) &nsec, 4); 00546 read((char*) &index_entry.chunk_pos, 8); //<! store position of the message in the chunk_pos field as it's 64 bits 00547 index_entry.time = Time(sec, nsec); 00548 index_entry.offset = 0; 00549 00550 ROS_DEBUG(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos); 00551 00552 if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX) 00553 { 00554 ROS_ERROR("Index entry for topic %s contains invalid time.", topic.c_str()); 00555 } else 00556 { 00557 connection_index.insert(connection_index.end(), index_entry); 00558 } 00559 } 00560 } 00561 00562 void Bag::readConnectionIndexRecord200() { 00563 ros::Header header; 00564 uint32_t data_size; 00565 if (!readHeader(header) || !readDataLength(data_size)) 00566 throw BagFormatException("Error reading INDEX_DATA header"); 00567 M_string& fields = *header.getValues(); 00568 00569 if (!isOp(fields, OP_INDEX_DATA)) 00570 throw BagFormatException("Expected INDEX_DATA record"); 00571 00572 uint32_t index_version; 00573 uint32_t connection_id; 00574 uint32_t count = 0; 00575 readField(fields, VER_FIELD_NAME, true, &index_version); 00576 readField(fields, CONNECTION_FIELD_NAME, true, &connection_id); 00577 readField(fields, COUNT_FIELD_NAME, true, &count); 00578 00579 ROS_DEBUG("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count); 00580 00581 if (index_version != 1) 00582 throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str()); 00583 00584 uint64_t chunk_pos = curr_chunk_info_.pos; 00585 00586 multiset<IndexEntry>& connection_index = connection_indexes_[connection_id]; 00587 00588 for (uint32_t i = 0; i < count; i++) { 00589 IndexEntry index_entry; 00590 index_entry.chunk_pos = chunk_pos; 00591 uint32_t sec; 00592 uint32_t nsec; 00593 read((char*) &sec, 4); 00594 read((char*) &nsec, 4); 00595 read((char*) &index_entry.offset, 4); 00596 index_entry.time = Time(sec, nsec); 00597 00598 ROS_DEBUG(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset); 00599 00600 if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX) 00601 { 00602 ROS_ERROR("Index entry for topic %s contains invalid time. This message will not be loaded.", connections_[connection_id]->topic.c_str()); 00603 } else 00604 { 00605 connection_index.insert(connection_index.end(), index_entry); 00606 } 00607 } 00608 } 00609 00610 // Connection records 00611 00612 void Bag::writeConnectionRecords() { 00613 for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) { 00614 ConnectionInfo const* connection_info = i->second; 00615 writeConnectionRecord(connection_info); 00616 } 00617 } 00618 00619 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) { 00620 ROS_DEBUG("Writing CONNECTION [%llu:%d]: topic=%s id=%d", 00621 (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id); 00622 00623 M_string header; 00624 header[OP_FIELD_NAME] = toHeaderString(&OP_CONNECTION); 00625 header[TOPIC_FIELD_NAME] = connection_info->topic; 00626 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id); 00627 writeHeader(header); 00628 00629 writeHeader(*connection_info->header); 00630 } 00631 00632 void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) { 00633 M_string header; 00634 header[OP_FIELD_NAME] = toHeaderString(&OP_CONNECTION); 00635 header[TOPIC_FIELD_NAME] = connection_info->topic; 00636 header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id); 00637 appendHeaderToBuffer(buf, header); 00638 00639 appendHeaderToBuffer(buf, *connection_info->header); 00640 } 00641 00642 void Bag::readConnectionRecord() { 00643 ros::Header header; 00644 if (!readHeader(header)) 00645 throw BagFormatException("Error reading CONNECTION header"); 00646 M_string& fields = *header.getValues(); 00647 00648 if (!isOp(fields, OP_CONNECTION)) 00649 throw BagFormatException("Expected CONNECTION op not found"); 00650 00651 uint32_t id; 00652 readField(fields, CONNECTION_FIELD_NAME, true, &id); 00653 string topic; 00654 readField(fields, TOPIC_FIELD_NAME, true, topic); 00655 00656 ros::Header connection_header; 00657 if (!readHeader(connection_header)) 00658 throw BagFormatException("Error reading connection header"); 00659 00660 // If this is a new connection, update connections 00661 map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id); 00662 if (key == connections_.end()) { 00663 ConnectionInfo* connection_info = new ConnectionInfo(); 00664 connection_info->id = id; 00665 connection_info->topic = topic; 00666 connection_info->header = shared_ptr<M_string>(new M_string); 00667 for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++) 00668 (*connection_info->header)[i->first] = i->second; 00669 connection_info->msg_def = (*connection_info->header)["message_definition"]; 00670 connection_info->datatype = (*connection_info->header)["type"]; 00671 connection_info->md5sum = (*connection_info->header)["md5sum"]; 00672 connections_[id] = connection_info; 00673 00674 ROS_DEBUG("Read CONNECTION: topic=%s id=%d", topic.c_str(), id); 00675 } 00676 } 00677 00678 void Bag::readMessageDefinitionRecord102() { 00679 ros::Header header; 00680 uint32_t data_size; 00681 if (!readHeader(header) || !readDataLength(data_size)) 00682 throw BagFormatException("Error reading message definition header"); 00683 M_string& fields = *header.getValues(); 00684 00685 if (!isOp(fields, OP_MSG_DEF)) 00686 throw BagFormatException("Expected MSG_DEF op not found"); 00687 00688 string topic, md5sum, datatype, message_definition; 00689 readField(fields, TOPIC_FIELD_NAME, true, topic); 00690 readField(fields, MD5_FIELD_NAME, 32, 32, true, md5sum); 00691 readField(fields, TYPE_FIELD_NAME, true, datatype); 00692 readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition); 00693 00694 ConnectionInfo* connection_info; 00695 00696 map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic); 00697 if (topic_conn_id_iter == topic_connection_ids_.end()) { 00698 uint32_t id = connections_.size(); 00699 00700 ROS_DEBUG("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str()); 00701 connection_info = new ConnectionInfo(); 00702 connection_info->id = id; 00703 connection_info->topic = topic; 00704 00705 connections_[id] = connection_info; 00706 topic_connection_ids_[topic] = id; 00707 } 00708 else 00709 connection_info = connections_[topic_conn_id_iter->second]; 00710 00711 connection_info->msg_def = message_definition; 00712 connection_info->datatype = datatype; 00713 connection_info->md5sum = md5sum; 00714 connection_info->header = boost::shared_ptr<ros::M_string>(new ros::M_string); 00715 (*connection_info->header)["type"] = connection_info->datatype; 00716 (*connection_info->header)["md5sum"] = connection_info->md5sum; 00717 (*connection_info->header)["message_definition"] = connection_info->msg_def; 00718 00719 ROS_DEBUG("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str()); 00720 } 00721 00722 void Bag::decompressChunk(uint64_t chunk_pos) const { 00723 if (curr_chunk_info_.pos == chunk_pos) { 00724 current_buffer_ = &outgoing_chunk_buffer_; 00725 return; 00726 } 00727 00728 current_buffer_ = &decompress_buffer_; 00729 00730 if (decompressed_chunk_ == chunk_pos) 00731 return; 00732 00733 // Seek to the start of the chunk 00734 seek(chunk_pos); 00735 00736 // Read the chunk header 00737 ChunkHeader chunk_header; 00738 readChunkHeader(chunk_header); 00739 00740 // Read and decompress the chunk. These assume we are at the right place in the stream already 00741 if (chunk_header.compression == COMPRESSION_NONE) 00742 decompressRawChunk(chunk_header); 00743 else if (chunk_header.compression == COMPRESSION_BZ2) 00744 decompressBz2Chunk(chunk_header); 00745 else 00746 throw BagFormatException("Unknown compression: " + chunk_header.compression); 00747 00748 decompressed_chunk_ = chunk_pos; 00749 } 00750 00751 void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const { 00752 ROS_DEBUG("readMessageDataRecord: offset=%llu", (unsigned long long) offset); 00753 00754 seek(offset); 00755 00756 uint32_t data_size; 00757 uint8_t op; 00758 do { 00759 if (!readHeader(header) || !readDataLength(data_size)) 00760 throw BagFormatException("Error reading header"); 00761 00762 readField(*header.getValues(), OP_FIELD_NAME, true, &op); 00763 } 00764 while (op == OP_MSG_DEF); 00765 00766 if (op != OP_MSG_DATA) 00767 throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str()); 00768 00769 record_buffer_.setSize(data_size); 00770 file_.read((char*) record_buffer_.getData(), data_size); 00771 } 00772 00773 // Reading this into a buffer isn't completely necessary, but we do it anyways for now 00774 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const { 00775 assert(chunk_header.compression == COMPRESSION_NONE); 00776 assert(chunk_header.compressed_size == chunk_header.uncompressed_size); 00777 00778 ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size); 00779 00780 decompress_buffer_.setSize(chunk_header.compressed_size); 00781 file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size); 00782 00783 // todo check read was successful 00784 } 00785 00786 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const { 00787 assert(chunk_header.compression == COMPRESSION_BZ2); 00788 00789 CompressionType compression = compression::BZ2; 00790 00791 ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size); 00792 00793 chunk_buffer_.setSize(chunk_header.compressed_size); 00794 file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size); 00795 00796 decompress_buffer_.setSize(chunk_header.uncompressed_size); 00797 file_.decompress(compression, decompress_buffer_.getData(), decompress_buffer_.getSize(), chunk_buffer_.getData(), chunk_buffer_.getSize()); 00798 00799 // todo check read was successful 00800 } 00801 00802 ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) { 00803 ros::Header header; 00804 uint32_t data_size; 00805 uint32_t bytes_read; 00806 switch (version_) 00807 { 00808 case 200: 00809 decompressChunk(index_entry.chunk_pos); 00810 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read); 00811 return header; 00812 case 102: 00813 readMessageDataRecord102(index_entry.chunk_pos, header); 00814 return header; 00815 default: 00816 throw BagFormatException((format("Unhandled version: %1%") % version_).str()); 00817 } 00818 } 00819 00820 // NOTE: this loads the header, which is unnecessary 00821 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const { 00822 ros::Header header; 00823 uint32_t data_size; 00824 uint32_t bytes_read; 00825 switch (version_) 00826 { 00827 case 200: 00828 decompressChunk(index_entry.chunk_pos); 00829 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read); 00830 return data_size; 00831 case 102: 00832 readMessageDataRecord102(index_entry.chunk_pos, header); 00833 return record_buffer_.getSize(); 00834 default: 00835 throw BagFormatException((format("Unhandled version: %1%") % version_).str()); 00836 } 00837 } 00838 00839 void Bag::writeChunkInfoRecords() { 00840 foreach(ChunkInfo const& chunk_info, chunks_) { 00841 // Write the chunk info header 00842 M_string header; 00843 uint32_t chunk_connection_count = chunk_info.connection_counts.size(); 00844 header[OP_FIELD_NAME] = toHeaderString(&OP_CHUNK_INFO); 00845 header[VER_FIELD_NAME] = toHeaderString(&CHUNK_INFO_VERSION); 00846 header[CHUNK_POS_FIELD_NAME] = toHeaderString(&chunk_info.pos); 00847 header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time); 00848 header[END_TIME_FIELD_NAME] = toHeaderString(&chunk_info.end_time); 00849 header[COUNT_FIELD_NAME] = toHeaderString(&chunk_connection_count); 00850 00851 ROS_DEBUG("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d", 00852 (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos, 00853 chunk_info.start_time.sec, chunk_info.start_time.nsec, 00854 chunk_info.end_time.sec, chunk_info.end_time.nsec); 00855 00856 writeHeader(header); 00857 00858 writeDataLength(8 * chunk_connection_count); 00859 00860 // Write the topic names and counts 00861 for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) { 00862 uint32_t connection_id = i->first; 00863 uint32_t count = i->second; 00864 00865 write((char*) &connection_id, 4); 00866 write((char*) &count, 4); 00867 00868 ROS_DEBUG(" - %d: %d", connection_id, count); 00869 } 00870 } 00871 } 00872 00873 void Bag::readChunkInfoRecord() { 00874 // Read a CHUNK_INFO header 00875 ros::Header header; 00876 uint32_t data_size; 00877 if (!readHeader(header) || !readDataLength(data_size)) 00878 throw BagFormatException("Error reading CHUNK_INFO record header"); 00879 M_string& fields = *header.getValues(); 00880 if (!isOp(fields, OP_CHUNK_INFO)) 00881 throw BagFormatException("Expected CHUNK_INFO op not found"); 00882 00883 // Check that the chunk info version is current 00884 uint32_t chunk_info_version; 00885 readField(fields, VER_FIELD_NAME, true, &chunk_info_version); 00886 if (chunk_info_version != CHUNK_INFO_VERSION) 00887 throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str()); 00888 00889 // Read the chunk position, timestamp, and topic count fields 00890 ChunkInfo chunk_info; 00891 readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos); 00892 readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time); 00893 readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time); 00894 uint32_t chunk_connection_count = 0; 00895 readField(fields, COUNT_FIELD_NAME, true, &chunk_connection_count); 00896 00897 ROS_DEBUG("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d", 00898 (unsigned long long) chunk_info.pos, chunk_connection_count, 00899 chunk_info.start_time.sec, chunk_info.start_time.nsec, 00900 chunk_info.end_time.sec, chunk_info.end_time.nsec); 00901 00902 // Read the topic count entries 00903 for (uint32_t i = 0; i < chunk_connection_count; i ++) { 00904 uint32_t connection_id, connection_count; 00905 read((char*) &connection_id, 4); 00906 read((char*) &connection_count, 4); 00907 00908 ROS_DEBUG(" %d: %d messages", connection_id, connection_count); 00909 00910 chunk_info.connection_counts[connection_id] = connection_count; 00911 } 00912 00913 chunks_.push_back(chunk_info); 00914 } 00915 00916 // Record I/O 00917 00918 bool Bag::isOp(M_string& fields, uint8_t reqOp) const { 00919 uint8_t op = 0xFF; // nonexistent op 00920 readField(fields, OP_FIELD_NAME, true, &op); 00921 return op == reqOp; 00922 } 00923 00924 void Bag::writeHeader(M_string const& fields) { 00925 boost::shared_array<uint8_t> header_buffer; 00926 uint32_t header_len; 00927 ros::Header::write(fields, header_buffer, header_len); 00928 write((char*) &header_len, 4); 00929 write((char*) header_buffer.get(), header_len); 00930 } 00931 00932 void Bag::writeDataLength(uint32_t data_len) { 00933 write((char*) &data_len, 4); 00934 } 00935 00936 void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) { 00937 boost::shared_array<uint8_t> header_buffer; 00938 uint32_t header_len; 00939 ros::Header::write(fields, header_buffer, header_len); 00940 00941 uint32_t offset = buf.getSize(); 00942 00943 buf.setSize(buf.getSize() + 4 + header_len); 00944 00945 memcpy(buf.getData() + offset, &header_len, 4); 00946 offset += 4; 00947 memcpy(buf.getData() + offset, header_buffer.get(), header_len); 00948 } 00949 00950 void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) { 00951 uint32_t offset = buf.getSize(); 00952 00953 buf.setSize(buf.getSize() + 4); 00954 00955 memcpy(buf.getData() + offset, &data_len, 4); 00956 } 00957 00959 void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const { 00960 ROS_ASSERT(buffer.getSize() > 8); 00961 00962 uint8_t* start = (uint8_t*) buffer.getData() + offset; 00963 00964 uint8_t* ptr = start; 00965 00966 // Read the header length 00967 uint32_t header_len; 00968 memcpy(&header_len, ptr, 4); 00969 ptr += 4; 00970 00971 // Parse the header 00972 string error_msg; 00973 bool parsed = header.parse(ptr, header_len, error_msg); 00974 if (!parsed) 00975 throw BagFormatException("Error parsing header"); 00976 ptr += header_len; 00977 00978 // Read the data size 00979 memcpy(&data_size, ptr, 4); 00980 ptr += 4; 00981 00982 bytes_read = ptr - start; 00983 } 00984 00985 void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) const { 00986 total_bytes_read = 0; 00987 uint8_t op = 0xFF; 00988 do { 00989 ROS_DEBUG("reading header from buffer: offset=%d", offset); 00990 uint32_t bytes_read; 00991 readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read); 00992 00993 offset += bytes_read; 00994 total_bytes_read += bytes_read; 00995 00996 readField(*header.getValues(), OP_FIELD_NAME, true, &op); 00997 } 00998 while (op == OP_MSG_DEF || op == OP_CONNECTION); 00999 01000 if (op != OP_MSG_DATA) 01001 throw BagFormatException("Expected MSG_DATA op not found"); 01002 } 01003 01004 bool Bag::readHeader(ros::Header& header) const { 01005 // Read the header length 01006 uint32_t header_len; 01007 read((char*) &header_len, 4); 01008 01009 // Read the header 01010 header_buffer_.setSize(header_len); 01011 read((char*) header_buffer_.getData(), header_len); 01012 01013 // Parse the header 01014 string error_msg; 01015 bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg); 01016 if (!parsed) 01017 return false; 01018 01019 return true; 01020 } 01021 01022 bool Bag::readDataLength(uint32_t& data_size) const { 01023 read((char*) &data_size, 4); 01024 return true; 01025 } 01026 01027 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const { 01028 M_string::const_iterator fitr = fields.find(field); 01029 if (fitr == fields.end()) { 01030 if (required) 01031 throw BagFormatException("Required '" + field + "' field missing"); 01032 } 01033 else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len)) 01034 throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str()); 01035 01036 return fitr; 01037 } 01038 01039 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const { 01040 return readField(fields, field_name, 1, UINT_MAX, required, data); 01041 } 01042 01043 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const { 01044 M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required); 01045 if (fitr == fields.end()) 01046 return false; 01047 01048 data = fitr->second; 01049 return true; 01050 } 01051 01052 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const { 01053 uint64_t packed_time; 01054 if (!readField(fields, field_name, required, &packed_time)) 01055 return false; 01056 01057 uint64_t bitmask = (1LL << 33) - 1; 01058 data.sec = (uint32_t) (packed_time & bitmask); 01059 data.nsec = (uint32_t) (packed_time >> 32); 01060 01061 return true; 01062 } 01063 01064 std::string Bag::toHeaderString(Time const* field) const { 01065 uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec; 01066 return toHeaderString(&packed_time); 01067 } 01068 01069 ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size) { 01070 ros::AdvertiseOptions opts(c->topic, queue_size, c->md5sum, c->datatype, c->msg_def); 01071 ros::M_string::const_iterator header_iter = c->header->find("latching"); 01072 opts.latch = (header_iter != c->header->end() && header_iter->second == "1"); 01073 return opts; 01074 } 01075 01076 // Low-level I/O 01077 01078 void Bag::write(string const& s) { write(s.c_str(), s.length()); } 01079 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n); } 01080 01081 void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); } 01082 void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); } 01083 01084 } // namespace rosbag