bag.cpp
Go to the documentation of this file.
00001 // Copyright (c) 2009, Willow Garage, Inc.
00002 // All rights reserved.
00003 // 
00004 // Redistribution and use in source and binary forms, with or without
00005 // modification, are permitted provided that the following conditions are met:
00006 // 
00007 //     * Redistributions of source code must retain the above copyright
00008 //       notice, this list of conditions and the following disclaimer.
00009 //     * Redistributions in binary form must reproduce the above copyright
00010 //       notice, this list of conditions and the following disclaimer in the
00011 //       documentation and/or other materials provided with the distribution.
00012 //     * Neither the name of Willow Garage, Inc. nor the names of its
00013 //       contributors may be used to endorse or promote products derived from
00014 //       this software without specific prior written permission.
00015 // 
00016 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00019 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00020 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00021 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00022 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00023 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00024 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00025 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00026 // POSSIBILITY OF SUCH DAMAGE.
00027 
00028 #include "rosbag/bag.h"
00029 #include "rosbag/message_instance.h"
00030 #include "rosbag/query.h"
00031 #include "rosbag/view.h"
00032 
00033 #include <inttypes.h>
00034 #include <signal.h>
00035 
00036 #include <iomanip>
00037 
00038 #include <boost/foreach.hpp>
00039 
00040 #define foreach BOOST_FOREACH
00041 
00042 using std::map;
00043 using std::priority_queue;
00044 using std::string;
00045 using std::vector;
00046 using std::multiset;
00047 using boost::format;
00048 using boost::shared_ptr;
00049 using ros::M_string;
00050 using ros::Time;
00051 
00052 namespace rosbag {
00053 
00054 Bag::Bag() :
00055     compression_(compression::Uncompressed),
00056     chunk_threshold_(768 * 1024),  // 768KB chunks
00057     bag_revision_(0),
00058     file_size_(0),
00059     file_header_pos_(0),
00060     index_data_pos_(0),
00061     connection_count_(0),
00062     chunk_count_(0),
00063     chunk_open_(false),
00064     curr_chunk_data_pos_(0),
00065     decompressed_chunk_(0)
00066 {
00067 }
00068 
00069 Bag::Bag(string const& filename, uint32_t mode) :
00070     compression_(compression::Uncompressed),
00071     chunk_threshold_(768 * 1024),  // 768KB chunks
00072     bag_revision_(0),
00073     file_size_(0),
00074     file_header_pos_(0),
00075     index_data_pos_(0),
00076     connection_count_(0),
00077     chunk_count_(0),
00078     chunk_open_(false),
00079     curr_chunk_data_pos_(0),
00080     decompressed_chunk_(0)
00081 {
00082     open(filename, mode);
00083 }
00084 
00085 Bag::~Bag() {
00086     close();
00087 }
00088 
00089 void Bag::open(string const& filename, uint32_t mode) {
00090     mode_ = (BagMode) mode;
00091 
00092     if (mode_ & bagmode::Append)
00093         openAppend(filename);
00094     else if (mode_ & bagmode::Write)
00095         openWrite(filename);
00096     else if (mode_ & bagmode::Read)
00097         openRead(filename);
00098     else
00099         throw BagException((format("Unknown mode: %1%") % (int) mode).str());
00100 
00101     // Determine file size
00102     uint64_t offset = file_.getOffset();
00103     seek(0, std::ios::end);
00104     file_size_ = file_.getOffset();
00105     seek(offset);
00106 }
00107 
00108 void Bag::openRead(string const& filename) {
00109     file_.openRead(filename);
00110 
00111     readVersion();
00112 
00113     switch (version_) {
00114     case 102: startReadingVersion102(); break;
00115     case 200: startReadingVersion200(); break;
00116     default:
00117         throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str());
00118     }
00119 }
00120 
00121 void Bag::openWrite(string const& filename) {
00122     file_.openWrite(filename);
00123 
00124     startWriting();
00125 }
00126 
00127 void Bag::openAppend(string const& filename) {
00128     file_.openReadWrite(filename);
00129 
00130     readVersion();
00131 
00132     if (version_ != 200)
00133         throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str());
00134 
00135     startReadingVersion200();
00136 
00137     // Truncate the file to chop off the index
00138     file_.truncate(index_data_pos_);
00139     index_data_pos_ = 0;
00140 
00141     // Rewrite the file header, clearing the index position (so we know if the index is invalid)
00142     seek(file_header_pos_);
00143     writeFileHeaderRecord();
00144 
00145     // Seek to the end of the file
00146     seek(0, std::ios::end);
00147 }
00148 
00149 void Bag::close() {
00150     if (!file_.isOpen())
00151         return;
00152 
00153     if (mode_ & bagmode::Write || mode_ & bagmode::Append)
00154         closeWrite();
00155     
00156     file_.close();
00157 
00158     topic_connection_ids_.clear();
00159     header_connection_ids_.clear();
00160     for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
00161         delete i->second;
00162     connections_.clear();
00163     chunks_.clear();
00164     connection_indexes_.clear();
00165     curr_chunk_connection_indexes_.clear();
00166 }
00167 
00168 void Bag::closeWrite() {
00169     stopWriting();
00170 }
00171 
00172 string   Bag::getFileName() const { return file_.getFileName(); }
00173 BagMode  Bag::getMode()     const { return mode_;               }
00174 uint64_t Bag::getSize()     const { return file_size_;          }
00175 
00176 uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; }
00177 
00178 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
00179     if (file_.isOpen() && chunk_open_)
00180         stopWritingChunk();
00181 
00182     chunk_threshold_ = chunk_threshold;
00183 }
00184 
00185 CompressionType Bag::getCompression() const { return compression_; }
00186 
00187 void Bag::setCompression(CompressionType compression) {
00188     if (file_.isOpen() && chunk_open_)
00189         stopWritingChunk();
00190 
00191     compression_ = compression;
00192 }
00193 
00194 // Version
00195 
00196 void Bag::writeVersion() {
00197     string version = string("#ROSBAG V") + VERSION + string("\n");
00198 
00199     ROS_DEBUG("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
00200 
00201     version_ = 200;
00202 
00203     write(version);
00204 }
00205 
00206 void Bag::readVersion() {
00207     string version_line = file_.getline();
00208 
00209     file_header_pos_ = file_.getOffset();
00210 
00211     char logtypename[100];
00212     int version_major, version_minor;
00213     if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3)
00214         throw BagIOException("Error reading version line");
00215 
00216     version_ = version_major * 100 + version_minor;
00217 
00218     ROS_DEBUG("Read VERSION: version=%d", version_);
00219 }
00220 
00221 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
00222 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
00223 
00224 //
00225 
00226 void Bag::startWriting() {
00227     writeVersion();
00228     file_header_pos_ = file_.getOffset();
00229     writeFileHeaderRecord();
00230 }
00231 
00232 void Bag::stopWriting() {
00233     if (chunk_open_)
00234         stopWritingChunk();
00235 
00236     seek(0, std::ios::end);
00237 
00238     index_data_pos_ = file_.getOffset();
00239     writeConnectionRecords();
00240     writeChunkInfoRecords();
00241 
00242     seek(file_header_pos_);
00243     writeFileHeaderRecord();
00244 }
00245 
00246 void Bag::startReadingVersion200() {
00247     // Read the file header record, which points to the end of the chunks
00248     readFileHeaderRecord();
00249 
00250     // Seek to the end of the chunks
00251     seek(index_data_pos_);
00252 
00253     // Read the connection records (one for each connection)
00254     for (uint32_t i = 0; i < connection_count_; i++)
00255         readConnectionRecord();
00256 
00257     // Read the chunk info records
00258     for (uint32_t i = 0; i < chunk_count_; i++)
00259         readChunkInfoRecord();
00260 
00261     // Read the connection indexes for each chunk
00262     foreach(ChunkInfo const& chunk_info, chunks_) {
00263         curr_chunk_info_ = chunk_info;
00264 
00265         seek(curr_chunk_info_.pos);
00266 
00267         // Skip over the chunk data
00268         ChunkHeader chunk_header;
00269         readChunkHeader(chunk_header);
00270         seek(chunk_header.compressed_size, std::ios::cur);
00271 
00272         // Read the index records after the chunk
00273         for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
00274             readConnectionIndexRecord200();
00275     }
00276 
00277     // At this point we don't have a curr_chunk_info anymore so we reset it
00278     curr_chunk_info_ = ChunkInfo();
00279 }
00280 
00281 void Bag::startReadingVersion102() {
00282     try
00283     {
00284         // Read the file header record, which points to the start of the topic indexes
00285         readFileHeaderRecord();
00286     }
00287     catch (BagFormatException ex) {
00288         throw BagUnindexedException();
00289     }
00290 
00291     // Get the length of the file
00292     seek(0, std::ios::end);
00293     uint64_t filelength = file_.getOffset();
00294 
00295     // Seek to the beginning of the topic index records
00296     seek(index_data_pos_);
00297 
00298     // Read the topic index records, which point to the offsets of each message in the file
00299     while (file_.getOffset() < filelength)
00300         readTopicIndexRecord102();
00301 
00302     // Read the message definition records (which are the first entry in the topic indexes)
00303     for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
00304         multiset<IndexEntry> const& index       = i->second;
00305         IndexEntry const&           first_entry = *index.begin();
00306 
00307         ROS_DEBUG("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
00308 
00309         seek(first_entry.chunk_pos);
00310 
00311         readMessageDefinitionRecord102();
00312     }
00313 }
00314 
00315 // File header record
00316 
00317 void Bag::writeFileHeaderRecord() {
00318     connection_count_ = connections_.size();
00319     chunk_count_      = chunks_.size();
00320 
00321     ROS_DEBUG("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
00322               (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00323     
00324     // Write file header record
00325     M_string header;
00326     header[OP_FIELD_NAME]               = toHeaderString(&OP_FILE_HEADER);
00327     header[INDEX_POS_FIELD_NAME]        = toHeaderString(&index_data_pos_);
00328     header[CONNECTION_COUNT_FIELD_NAME] = toHeaderString(&connection_count_);
00329     header[CHUNK_COUNT_FIELD_NAME]      = toHeaderString(&chunk_count_);
00330 
00331     boost::shared_array<uint8_t> header_buffer;
00332     uint32_t header_len;
00333     ros::Header::write(header, header_buffer, header_len);
00334     uint32_t data_len = 0;
00335     if (header_len < FILE_HEADER_LENGTH)
00336         data_len = FILE_HEADER_LENGTH - header_len;
00337     write((char*) &header_len, 4);
00338     write((char*) header_buffer.get(), header_len);
00339     write((char*) &data_len, 4);
00340     
00341     // Pad the file header record out
00342     if (data_len > 0) {
00343         string padding;
00344         padding.resize(data_len, ' ');
00345         write(padding);
00346     }
00347 }
00348 
00349 void Bag::readFileHeaderRecord() {
00350     ros::Header header;
00351     uint32_t data_size;
00352     if (!readHeader(header) || !readDataLength(data_size))
00353         throw BagFormatException("Error reading FILE_HEADER record");
00354 
00355     M_string& fields = *header.getValues();
00356 
00357     if (!isOp(fields, OP_FILE_HEADER))
00358         throw BagFormatException("Expected FILE_HEADER op not found");
00359 
00360     // Read index position
00361     readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
00362 
00363     if (index_data_pos_ == 0)
00364         throw BagUnindexedException();
00365 
00366     // Read topic and chunks count
00367     if (version_ >= 200) {
00368         readField(fields, CONNECTION_COUNT_FIELD_NAME, true, &connection_count_);
00369         readField(fields, CHUNK_COUNT_FIELD_NAME,      true, &chunk_count_);
00370     }
00371 
00372     ROS_DEBUG("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
00373               (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00374 
00375     // Skip the data section (just padding)
00376     seek(data_size, std::ios::cur);
00377 }
00378 
00379 uint32_t Bag::getChunkOffset() const {
00380     if (compression_ == compression::Uncompressed)
00381         return file_.getOffset() - curr_chunk_data_pos_;
00382     else
00383         return file_.getCompressedBytesIn();
00384 }
00385 
00386 void Bag::startWritingChunk(Time time) {
00387     // Initialize chunk info
00388     curr_chunk_info_.pos        = file_.getOffset();
00389     curr_chunk_info_.start_time = time;
00390     curr_chunk_info_.end_time   = time;
00391 
00392     // Write the chunk header, with a place-holder for the data sizes (we'll fill in when the chunk is finished)
00393     writeChunkHeader(compression_, 0, 0);
00394 
00395     // Turn on compressed writing
00396     file_.setWriteMode(compression_);
00397     
00398     // Record where the data section of this chunk started
00399     curr_chunk_data_pos_ = file_.getOffset();
00400 
00401     chunk_open_ = true;
00402 }
00403 
00404 void Bag::stopWritingChunk() {
00405     // Add this chunk to the index
00406     chunks_.push_back(curr_chunk_info_);
00407     
00408     // Get the uncompressed and compressed sizes
00409     uint32_t uncompressed_size = getChunkOffset();
00410     file_.setWriteMode(compression::Uncompressed);
00411     uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_;
00412 
00413     // Rewrite the chunk header with the size of the chunk (remembering current offset)
00414     uint64_t end_of_chunk_pos = file_.getOffset();
00415 
00416     seek(curr_chunk_info_.pos);
00417     writeChunkHeader(compression_, compressed_size, uncompressed_size);
00418 
00419     // Write out the indexes and clear them
00420     seek(end_of_chunk_pos);
00421     writeIndexRecords();
00422     curr_chunk_connection_indexes_.clear();
00423 
00424     // Clear the connection counts
00425     curr_chunk_info_.connection_counts.clear();
00426     
00427     // Flag that we're starting a new chunk
00428     chunk_open_ = false;
00429 }
00430 
00431 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
00432     ChunkHeader chunk_header;
00433     switch (compression) {
00434     case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
00435     case compression::BZ2:          chunk_header.compression = COMPRESSION_BZ2;  break;
00436     //case compression::ZLIB:         chunk_header.compression = COMPRESSION_ZLIB; break;
00437     }
00438     chunk_header.compressed_size   = compressed_size;
00439     chunk_header.uncompressed_size = uncompressed_size;
00440 
00441     ROS_DEBUG("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
00442               (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
00443 
00444     M_string header;
00445     header[OP_FIELD_NAME]          = toHeaderString(&OP_CHUNK);
00446     header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
00447     header[SIZE_FIELD_NAME]        = toHeaderString(&chunk_header.uncompressed_size);
00448     writeHeader(header);
00449 
00450     writeDataLength(chunk_header.compressed_size);
00451 }
00452 
00453 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
00454     ros::Header header;
00455     if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
00456         throw BagFormatException("Error reading CHUNK record");
00457         
00458     M_string& fields = *header.getValues();
00459 
00460     if (!isOp(fields, OP_CHUNK))
00461         throw BagFormatException("Expected CHUNK op not found");
00462 
00463     readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
00464     readField(fields, SIZE_FIELD_NAME,        true, &chunk_header.uncompressed_size);
00465 
00466     ROS_DEBUG("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
00467 }
00468 
00469 // Index records
00470 
00471 void Bag::writeIndexRecords() {
00472     for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
00473         uint32_t                    connection_id = i->first;
00474         multiset<IndexEntry> const& index         = i->second;
00475 
00476         // Write the index record header
00477         uint32_t index_size = index.size();
00478         M_string header;
00479         header[OP_FIELD_NAME]         = toHeaderString(&OP_INDEX_DATA);
00480         header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
00481         header[VER_FIELD_NAME]        = toHeaderString(&INDEX_VERSION);
00482         header[COUNT_FIELD_NAME]      = toHeaderString(&index_size);
00483         writeHeader(header);
00484 
00485         writeDataLength(index_size * 12);
00486 
00487         ROS_DEBUG("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
00488 
00489         // Write the index record data (pairs of timestamp and position in file)
00490         foreach(IndexEntry const& e, index) {
00491             write((char*) &e.time.sec,  4);
00492             write((char*) &e.time.nsec, 4);
00493             write((char*) &e.offset,    4);
00494 
00495             ROS_DEBUG("  - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
00496         }
00497     }
00498 }
00499 
00500 void Bag::readTopicIndexRecord102() {
00501     ros::Header header;
00502     uint32_t data_size;
00503     if (!readHeader(header) || !readDataLength(data_size))
00504         throw BagFormatException("Error reading INDEX_DATA header");
00505     M_string& fields = *header.getValues();
00506 
00507     if (!isOp(fields, OP_INDEX_DATA))
00508         throw BagFormatException("Expected INDEX_DATA record");
00509 
00510     uint32_t index_version;
00511     string topic;
00512     uint32_t count = 0;
00513     readField(fields, VER_FIELD_NAME,   true, &index_version);
00514     readField(fields, TOPIC_FIELD_NAME, true, topic);
00515     readField(fields, COUNT_FIELD_NAME, true, &count);
00516 
00517     ROS_DEBUG("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
00518 
00519     if (index_version != 0)
00520         throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00521 
00522     uint32_t connection_id;
00523     map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00524     if (topic_conn_id_iter == topic_connection_ids_.end()) {
00525         connection_id = connections_.size();
00526 
00527         ROS_DEBUG("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
00528         ConnectionInfo* connection_info = new ConnectionInfo();
00529         connection_info->id       = connection_id;
00530         connection_info->topic    = topic;
00531         connections_[connection_id] = connection_info;
00532 
00533         topic_connection_ids_[topic] = connection_id;
00534     }
00535     else
00536         connection_id = topic_conn_id_iter->second;
00537 
00538     multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00539 
00540     for (uint32_t i = 0; i < count; i++) {
00541         IndexEntry index_entry;
00542         uint32_t sec;
00543         uint32_t nsec;
00544         read((char*) &sec,                   4);
00545         read((char*) &nsec,                  4);
00546         read((char*) &index_entry.chunk_pos, 8);   //<! store position of the message in the chunk_pos field as it's 64 bits
00547         index_entry.time = Time(sec, nsec);
00548         index_entry.offset = 0;
00549 
00550         ROS_DEBUG("  - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
00551 
00552         if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00553         {
00554           ROS_ERROR("Index entry for topic %s contains invalid time.", topic.c_str());
00555         } else
00556         {
00557           connection_index.insert(connection_index.end(), index_entry);
00558         }
00559     }
00560 }
00561 
00562 void Bag::readConnectionIndexRecord200() {
00563     ros::Header header;
00564     uint32_t data_size;
00565     if (!readHeader(header) || !readDataLength(data_size))
00566         throw BagFormatException("Error reading INDEX_DATA header");
00567     M_string& fields = *header.getValues();
00568     
00569     if (!isOp(fields, OP_INDEX_DATA))
00570         throw BagFormatException("Expected INDEX_DATA record");
00571     
00572     uint32_t index_version;
00573     uint32_t connection_id;
00574     uint32_t count = 0;
00575     readField(fields, VER_FIELD_NAME,        true, &index_version);
00576     readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
00577     readField(fields, COUNT_FIELD_NAME,      true, &count);
00578 
00579     ROS_DEBUG("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
00580 
00581     if (index_version != 1)
00582         throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00583 
00584     uint64_t chunk_pos = curr_chunk_info_.pos;
00585 
00586     multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00587 
00588     for (uint32_t i = 0; i < count; i++) {
00589         IndexEntry index_entry;
00590         index_entry.chunk_pos = chunk_pos;
00591         uint32_t sec;
00592         uint32_t nsec;
00593         read((char*) &sec,                4);
00594         read((char*) &nsec,               4);
00595         read((char*) &index_entry.offset, 4);
00596         index_entry.time = Time(sec, nsec);
00597 
00598         ROS_DEBUG("  - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
00599 
00600         if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00601         {
00602           ROS_ERROR("Index entry for topic %s contains invalid time.  This message will not be loaded.", connections_[connection_id]->topic.c_str());
00603         } else
00604         {
00605           connection_index.insert(connection_index.end(), index_entry);
00606         }
00607     }
00608 }
00609 
00610 // Connection records
00611 
00612 void Bag::writeConnectionRecords() {
00613     for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
00614         ConnectionInfo const* connection_info = i->second;
00615         writeConnectionRecord(connection_info);
00616     }
00617 }
00618 
00619 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) {
00620     ROS_DEBUG("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
00621               (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
00622 
00623     M_string header;
00624     header[OP_FIELD_NAME]         = toHeaderString(&OP_CONNECTION);
00625     header[TOPIC_FIELD_NAME]      = connection_info->topic;
00626     header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00627     writeHeader(header);
00628 
00629     writeHeader(*connection_info->header);
00630 }
00631 
00632 void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) {
00633     M_string header;
00634     header[OP_FIELD_NAME]         = toHeaderString(&OP_CONNECTION);
00635     header[TOPIC_FIELD_NAME]      = connection_info->topic;
00636     header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00637     appendHeaderToBuffer(buf, header);
00638 
00639     appendHeaderToBuffer(buf, *connection_info->header);
00640 }
00641 
00642 void Bag::readConnectionRecord() {
00643     ros::Header header;
00644     if (!readHeader(header))
00645         throw BagFormatException("Error reading CONNECTION header");
00646     M_string& fields = *header.getValues();
00647 
00648     if (!isOp(fields, OP_CONNECTION))
00649         throw BagFormatException("Expected CONNECTION op not found");
00650 
00651     uint32_t id;
00652     readField(fields, CONNECTION_FIELD_NAME, true, &id);
00653     string topic;
00654     readField(fields, TOPIC_FIELD_NAME,      true, topic);
00655 
00656     ros::Header connection_header;
00657     if (!readHeader(connection_header))
00658         throw BagFormatException("Error reading connection header");
00659 
00660     // If this is a new connection, update connections
00661     map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
00662     if (key == connections_.end()) {
00663         ConnectionInfo* connection_info = new ConnectionInfo();
00664         connection_info->id       = id;
00665         connection_info->topic    = topic;
00666         connection_info->header = shared_ptr<M_string>(new M_string);
00667         for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
00668             (*connection_info->header)[i->first] = i->second;
00669         connection_info->msg_def  = (*connection_info->header)["message_definition"];
00670         connection_info->datatype = (*connection_info->header)["type"];
00671         connection_info->md5sum   = (*connection_info->header)["md5sum"];
00672         connections_[id] = connection_info;
00673 
00674         ROS_DEBUG("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
00675     }
00676 }
00677 
00678 void Bag::readMessageDefinitionRecord102() {
00679     ros::Header header;
00680     uint32_t data_size;
00681     if (!readHeader(header) || !readDataLength(data_size))
00682         throw BagFormatException("Error reading message definition header");
00683     M_string& fields = *header.getValues();
00684 
00685     if (!isOp(fields, OP_MSG_DEF))
00686         throw BagFormatException("Expected MSG_DEF op not found");
00687 
00688     string topic, md5sum, datatype, message_definition;
00689     readField(fields, TOPIC_FIELD_NAME,               true, topic);
00690     readField(fields, MD5_FIELD_NAME,   32,       32, true, md5sum);
00691     readField(fields, TYPE_FIELD_NAME,                true, datatype);
00692     readField(fields, DEF_FIELD_NAME,    0, UINT_MAX, true, message_definition);
00693 
00694     ConnectionInfo* connection_info;
00695 
00696     map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00697     if (topic_conn_id_iter == topic_connection_ids_.end()) {
00698         uint32_t id = connections_.size();
00699 
00700         ROS_DEBUG("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00701         connection_info = new ConnectionInfo();
00702         connection_info->id       = id;
00703         connection_info->topic    = topic;
00704 
00705         connections_[id] = connection_info;
00706         topic_connection_ids_[topic] = id;
00707     }
00708     else
00709         connection_info = connections_[topic_conn_id_iter->second];
00710 
00711     connection_info->msg_def  = message_definition;
00712     connection_info->datatype = datatype;
00713     connection_info->md5sum   = md5sum;
00714     connection_info->header = boost::shared_ptr<ros::M_string>(new ros::M_string);
00715     (*connection_info->header)["type"]               = connection_info->datatype;
00716     (*connection_info->header)["md5sum"]             = connection_info->md5sum;
00717     (*connection_info->header)["message_definition"] = connection_info->msg_def;
00718 
00719     ROS_DEBUG("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00720 }
00721 
00722 void Bag::decompressChunk(uint64_t chunk_pos) const {
00723     if (curr_chunk_info_.pos == chunk_pos) {
00724         current_buffer_ = &outgoing_chunk_buffer_;
00725         return;
00726     }
00727 
00728     current_buffer_ = &decompress_buffer_;
00729 
00730     if (decompressed_chunk_ == chunk_pos)
00731         return;
00732 
00733     // Seek to the start of the chunk
00734     seek(chunk_pos);
00735 
00736     // Read the chunk header
00737     ChunkHeader chunk_header;
00738     readChunkHeader(chunk_header);
00739 
00740     // Read and decompress the chunk.  These assume we are at the right place in the stream already
00741     if (chunk_header.compression == COMPRESSION_NONE)
00742         decompressRawChunk(chunk_header);
00743     else if (chunk_header.compression == COMPRESSION_BZ2)
00744         decompressBz2Chunk(chunk_header);
00745     else
00746         throw BagFormatException("Unknown compression: " + chunk_header.compression);
00747     
00748     decompressed_chunk_ = chunk_pos;
00749 }
00750 
00751 void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const {
00752     ROS_DEBUG("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
00753 
00754     seek(offset);
00755 
00756     uint32_t data_size;
00757     uint8_t op;
00758     do {
00759         if (!readHeader(header) || !readDataLength(data_size))
00760             throw BagFormatException("Error reading header");
00761 
00762         readField(*header.getValues(), OP_FIELD_NAME, true, &op);
00763     }
00764     while (op == OP_MSG_DEF);
00765 
00766     if (op != OP_MSG_DATA)
00767         throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str());
00768 
00769     record_buffer_.setSize(data_size);
00770     file_.read((char*) record_buffer_.getData(), data_size);
00771 }
00772 
00773 // Reading this into a buffer isn't completely necessary, but we do it anyways for now
00774 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
00775     assert(chunk_header.compression == COMPRESSION_NONE);
00776     assert(chunk_header.compressed_size == chunk_header.uncompressed_size);
00777 
00778     ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00779 
00780     decompress_buffer_.setSize(chunk_header.compressed_size);
00781     file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size);
00782 
00783     // todo check read was successful
00784 }
00785 
00786 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
00787     assert(chunk_header.compression == COMPRESSION_BZ2);
00788 
00789     CompressionType compression = compression::BZ2;
00790 
00791     ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00792 
00793     chunk_buffer_.setSize(chunk_header.compressed_size);
00794     file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
00795 
00796     decompress_buffer_.setSize(chunk_header.uncompressed_size);
00797     file_.decompress(compression, decompress_buffer_.getData(), decompress_buffer_.getSize(), chunk_buffer_.getData(), chunk_buffer_.getSize());
00798 
00799     // todo check read was successful
00800 }
00801 
00802 ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) {
00803     ros::Header header;
00804     uint32_t data_size;
00805     uint32_t bytes_read;
00806     switch (version_)
00807     {
00808     case 200:
00809         decompressChunk(index_entry.chunk_pos);
00810         readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00811         return header;
00812     case 102:
00813         readMessageDataRecord102(index_entry.chunk_pos, header);
00814         return header;
00815     default:
00816         throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00817     }
00818 }
00819 
00820 // NOTE: this loads the header, which is unnecessary
00821 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
00822     ros::Header header;
00823     uint32_t data_size;
00824     uint32_t bytes_read;
00825     switch (version_)
00826     {
00827     case 200:
00828         decompressChunk(index_entry.chunk_pos);
00829         readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00830         return data_size;
00831     case 102:
00832         readMessageDataRecord102(index_entry.chunk_pos, header);
00833         return record_buffer_.getSize();
00834     default:
00835         throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00836     }
00837 }
00838 
00839 void Bag::writeChunkInfoRecords() {
00840     foreach(ChunkInfo const& chunk_info, chunks_) {
00841         // Write the chunk info header
00842         M_string header;
00843         uint32_t chunk_connection_count = chunk_info.connection_counts.size();
00844         header[OP_FIELD_NAME]         = toHeaderString(&OP_CHUNK_INFO);
00845         header[VER_FIELD_NAME]        = toHeaderString(&CHUNK_INFO_VERSION);
00846         header[CHUNK_POS_FIELD_NAME]  = toHeaderString(&chunk_info.pos);
00847         header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
00848         header[END_TIME_FIELD_NAME]   = toHeaderString(&chunk_info.end_time);
00849         header[COUNT_FIELD_NAME]      = toHeaderString(&chunk_connection_count);
00850 
00851         ROS_DEBUG("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
00852                   (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
00853                   chunk_info.start_time.sec, chunk_info.start_time.nsec,
00854                   chunk_info.end_time.sec, chunk_info.end_time.nsec);
00855 
00856         writeHeader(header);
00857 
00858         writeDataLength(8 * chunk_connection_count);
00859 
00860         // Write the topic names and counts
00861         for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
00862             uint32_t connection_id = i->first;
00863             uint32_t count         = i->second;
00864 
00865             write((char*) &connection_id, 4);
00866             write((char*) &count, 4);
00867 
00868             ROS_DEBUG("  - %d: %d", connection_id, count);
00869         }
00870     }
00871 }
00872 
00873 void Bag::readChunkInfoRecord() {
00874     // Read a CHUNK_INFO header
00875     ros::Header header;
00876     uint32_t data_size;
00877     if (!readHeader(header) || !readDataLength(data_size))
00878         throw BagFormatException("Error reading CHUNK_INFO record header");
00879     M_string& fields = *header.getValues();
00880     if (!isOp(fields, OP_CHUNK_INFO))
00881         throw BagFormatException("Expected CHUNK_INFO op not found");
00882 
00883     // Check that the chunk info version is current
00884     uint32_t chunk_info_version;
00885     readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
00886     if (chunk_info_version != CHUNK_INFO_VERSION)
00887         throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str());
00888 
00889     // Read the chunk position, timestamp, and topic count fields
00890     ChunkInfo chunk_info;
00891     readField(fields, CHUNK_POS_FIELD_NAME,  true, &chunk_info.pos);
00892     readField(fields, START_TIME_FIELD_NAME, true,  chunk_info.start_time);
00893     readField(fields, END_TIME_FIELD_NAME,   true,  chunk_info.end_time);
00894     uint32_t chunk_connection_count = 0;
00895     readField(fields, COUNT_FIELD_NAME,      true, &chunk_connection_count);
00896 
00897     ROS_DEBUG("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
00898               (unsigned long long) chunk_info.pos, chunk_connection_count,
00899               chunk_info.start_time.sec, chunk_info.start_time.nsec,
00900               chunk_info.end_time.sec, chunk_info.end_time.nsec);
00901 
00902     // Read the topic count entries
00903     for (uint32_t i = 0; i < chunk_connection_count; i ++) {
00904         uint32_t connection_id, connection_count;
00905         read((char*) &connection_id,    4);
00906         read((char*) &connection_count, 4);
00907 
00908         ROS_DEBUG("  %d: %d messages", connection_id, connection_count);
00909 
00910         chunk_info.connection_counts[connection_id] = connection_count;
00911     }
00912 
00913     chunks_.push_back(chunk_info);
00914 }
00915 
00916 // Record I/O
00917 
00918 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
00919     uint8_t op = 0xFF; // nonexistent op
00920     readField(fields, OP_FIELD_NAME, true, &op);
00921     return op == reqOp;
00922 }
00923 
00924 void Bag::writeHeader(M_string const& fields) {
00925     boost::shared_array<uint8_t> header_buffer;
00926     uint32_t header_len;
00927     ros::Header::write(fields, header_buffer, header_len);
00928     write((char*) &header_len, 4);
00929     write((char*) header_buffer.get(), header_len);
00930 }
00931 
00932 void Bag::writeDataLength(uint32_t data_len) {
00933     write((char*) &data_len, 4);
00934 }
00935 
00936 void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) {
00937     boost::shared_array<uint8_t> header_buffer;
00938     uint32_t header_len;
00939     ros::Header::write(fields, header_buffer, header_len);
00940 
00941     uint32_t offset = buf.getSize();
00942 
00943     buf.setSize(buf.getSize() + 4 + header_len);
00944 
00945     memcpy(buf.getData() + offset, &header_len, 4);
00946     offset += 4;
00947     memcpy(buf.getData() + offset, header_buffer.get(), header_len);
00948 }
00949 
00950 void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) {
00951     uint32_t offset = buf.getSize();
00952 
00953     buf.setSize(buf.getSize() + 4);
00954 
00955     memcpy(buf.getData() + offset, &data_len, 4);
00956 }
00957 
00959 void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const {
00960     ROS_ASSERT(buffer.getSize() > 8);
00961 
00962     uint8_t* start = (uint8_t*) buffer.getData() + offset;
00963 
00964     uint8_t* ptr = start;
00965 
00966     // Read the header length
00967     uint32_t header_len;
00968     memcpy(&header_len, ptr, 4);
00969     ptr += 4;
00970 
00971     // Parse the header
00972     string error_msg;
00973     bool parsed = header.parse(ptr, header_len, error_msg);
00974     if (!parsed)
00975         throw BagFormatException("Error parsing header");
00976     ptr += header_len;
00977 
00978     // Read the data size
00979     memcpy(&data_size, ptr, 4);
00980     ptr += 4;
00981 
00982     bytes_read = ptr - start;
00983 }
00984 
00985 void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) const {
00986     total_bytes_read = 0;
00987     uint8_t op = 0xFF;
00988     do {
00989         ROS_DEBUG("reading header from buffer: offset=%d", offset);
00990         uint32_t bytes_read;
00991         readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
00992 
00993         offset += bytes_read;
00994         total_bytes_read += bytes_read;
00995         
00996         readField(*header.getValues(), OP_FIELD_NAME, true, &op);
00997     }
00998     while (op == OP_MSG_DEF || op == OP_CONNECTION);
00999 
01000     if (op != OP_MSG_DATA)
01001         throw BagFormatException("Expected MSG_DATA op not found");
01002 }
01003 
01004 bool Bag::readHeader(ros::Header& header) const {
01005     // Read the header length
01006     uint32_t header_len;
01007     read((char*) &header_len, 4);
01008 
01009     // Read the header
01010     header_buffer_.setSize(header_len);
01011     read((char*) header_buffer_.getData(), header_len);
01012 
01013     // Parse the header
01014     string error_msg;
01015     bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
01016     if (!parsed)
01017         return false;
01018 
01019     return true;
01020 }
01021 
01022 bool Bag::readDataLength(uint32_t& data_size) const {
01023     read((char*) &data_size, 4);
01024     return true;
01025 }
01026 
01027 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
01028     M_string::const_iterator fitr = fields.find(field);
01029     if (fitr == fields.end()) {
01030         if (required)
01031             throw BagFormatException("Required '" + field + "' field missing");
01032     }
01033     else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
01034         throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
01035 
01036     return fitr;
01037 }
01038 
01039 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
01040     return readField(fields, field_name, 1, UINT_MAX, required, data);
01041 }
01042 
01043 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
01044     M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
01045     if (fitr == fields.end())
01046         return false;
01047 
01048     data = fitr->second;
01049     return true;
01050 }
01051 
01052 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
01053     uint64_t packed_time;
01054     if (!readField(fields, field_name, required, &packed_time))
01055         return false;
01056 
01057     uint64_t bitmask = (1LL << 33) - 1;
01058     data.sec  = (uint32_t) (packed_time & bitmask);
01059     data.nsec = (uint32_t) (packed_time >> 32);
01060 
01061     return true;
01062 }
01063 
01064 std::string Bag::toHeaderString(Time const* field) const {
01065     uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
01066     return toHeaderString(&packed_time);
01067 }
01068 
01069 ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size) {
01070     ros::AdvertiseOptions opts(c->topic, queue_size, c->md5sum, c->datatype, c->msg_def);
01071     ros::M_string::const_iterator header_iter = c->header->find("latching");
01072     opts.latch = (header_iter != c->header->end() && header_iter->second == "1");
01073     return opts;
01074 }
01075 
01076 // Low-level I/O
01077 
01078 void Bag::write(string const& s)                  { write(s.c_str(), s.length()); }
01079 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n);    }
01080 
01081 void Bag::read(char* b, std::streamsize n) const  { file_.read(b, n);             }
01082 void Bag::seek(uint64_t pos, int origin) const    { file_.seek(pos, origin);      }
01083 
01084 } // namespace rosbag


rosbag
Author(s): Tim Field (tfield@willowgarage.com), Jeremy Leibs (leibs@willowgarage.com), and James Bowman (jamesb@willowgarage.com)
autogenerated on Sat Dec 28 2013 17:43:05