bag.cpp
Go to the documentation of this file.
00001 // Copyright (c) 2009, Willow Garage, Inc.
00002 // All rights reserved.
00003 // 
00004 // Redistribution and use in source and binary forms, with or without
00005 // modification, are permitted provided that the following conditions are met:
00006 // 
00007 //     * Redistributions of source code must retain the above copyright
00008 //       notice, this list of conditions and the following disclaimer.
00009 //     * Redistributions in binary form must reproduce the above copyright
00010 //       notice, this list of conditions and the following disclaimer in the
00011 //       documentation and/or other materials provided with the distribution.
00012 //     * Neither the name of Willow Garage, Inc. nor the names of its
00013 //       contributors may be used to endorse or promote products derived from
00014 //       this software without specific prior written permission.
00015 // 
00016 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00019 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00020 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00021 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00022 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00023 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00024 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00025 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00026 // POSSIBILITY OF SUCH DAMAGE.
00027 
00028 #include "rosbag/bag.h"
00029 #include "rosbag/message_instance.h"
00030 #include "rosbag/query.h"
00031 #include "rosbag/view.h"
00032 
00033 #if defined(_MSC_VER)
00034   #include <stdint.h> // only on v2010 and later -> is this enough for msvc and linux?
00035 #else
00036   #include <inttypes.h>
00037 #endif
00038 #include <signal.h>
00039 #include <assert.h>
00040 #include <iomanip>
00041 
00042 #include <boost/foreach.hpp>
00043 
00044 #include "console_bridge/console.h"
00045 
00046 #define foreach BOOST_FOREACH
00047 
00048 using std::map;
00049 using std::priority_queue;
00050 using std::string;
00051 using std::vector;
00052 using std::multiset;
00053 using boost::format;
00054 using boost::shared_ptr;
00055 using ros::M_string;
00056 using ros::Time;
00057 
00058 namespace rosbag {
00059 
00060 Bag::Bag() :
00061     mode_(bagmode::Write),
00062     version_(0),
00063     compression_(compression::Uncompressed),
00064     chunk_threshold_(768 * 1024),  // 768KB chunks
00065     bag_revision_(0),
00066     file_size_(0),
00067     file_header_pos_(0),
00068     index_data_pos_(0),
00069     connection_count_(0),
00070     chunk_count_(0),
00071     chunk_open_(false),
00072     curr_chunk_data_pos_(0),
00073     current_buffer_(0),
00074     decompressed_chunk_(0)
00075 {
00076 }
00077 
00078 Bag::Bag(string const& filename, uint32_t mode) :
00079     compression_(compression::Uncompressed),
00080     chunk_threshold_(768 * 1024),  // 768KB chunks
00081     bag_revision_(0),
00082     file_size_(0),
00083     file_header_pos_(0),
00084     index_data_pos_(0),
00085     connection_count_(0),
00086     chunk_count_(0),
00087     chunk_open_(false),
00088     curr_chunk_data_pos_(0),
00089     current_buffer_(0),
00090     decompressed_chunk_(0)
00091 {
00092     open(filename, mode);
00093 }
00094 
00095 Bag::~Bag() {
00096     close();
00097 }
00098 
00099 void Bag::open(string const& filename, uint32_t mode) {
00100     mode_ = (BagMode) mode;
00101 
00102     if (mode_ & bagmode::Append)
00103         openAppend(filename);
00104     else if (mode_ & bagmode::Write)
00105         openWrite(filename);
00106     else if (mode_ & bagmode::Read)
00107         openRead(filename);
00108     else
00109         throw BagException((format("Unknown mode: %1%") % (int) mode).str());
00110 
00111     // Determine file size
00112     uint64_t offset = file_.getOffset();
00113     seek(0, std::ios::end);
00114     file_size_ = file_.getOffset();
00115     seek(offset);
00116 }
00117 
00118 void Bag::openRead(string const& filename) {
00119     file_.openRead(filename);
00120 
00121     readVersion();
00122 
00123     switch (version_) {
00124     case 102: startReadingVersion102(); break;
00125     case 200: startReadingVersion200(); break;
00126     default:
00127         throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str());
00128     }
00129 }
00130 
00131 void Bag::openWrite(string const& filename) {
00132     file_.openWrite(filename);
00133 
00134     startWriting();
00135 }
00136 
00137 void Bag::openAppend(string const& filename) {
00138     file_.openReadWrite(filename);
00139 
00140     readVersion();
00141 
00142     if (version_ != 200)
00143         throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str());
00144 
00145     startReadingVersion200();
00146 
00147     // Truncate the file to chop off the index
00148     file_.truncate(index_data_pos_);
00149     index_data_pos_ = 0;
00150 
00151     // Rewrite the file header, clearing the index position (so we know if the index is invalid)
00152     seek(file_header_pos_);
00153     writeFileHeaderRecord();
00154 
00155     // Seek to the end of the file
00156     seek(0, std::ios::end);
00157 }
00158 
00159 void Bag::close() {
00160     if (!file_.isOpen())
00161         return;
00162 
00163     if (mode_ & bagmode::Write || mode_ & bagmode::Append)
00164         closeWrite();
00165     
00166     file_.close();
00167 
00168     topic_connection_ids_.clear();
00169     header_connection_ids_.clear();
00170     for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
00171         delete i->second;
00172     connections_.clear();
00173     chunks_.clear();
00174     connection_indexes_.clear();
00175     curr_chunk_connection_indexes_.clear();
00176 }
00177 
00178 void Bag::closeWrite() {
00179     stopWriting();
00180 }
00181 
00182 string   Bag::getFileName() const { return file_.getFileName(); }
00183 BagMode  Bag::getMode()     const { return mode_;               }
00184 uint64_t Bag::getSize()     const { return file_size_;          }
00185 
00186 uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; }
00187 
00188 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
00189     if (file_.isOpen() && chunk_open_)
00190         stopWritingChunk();
00191 
00192     chunk_threshold_ = chunk_threshold;
00193 }
00194 
00195 CompressionType Bag::getCompression() const { return compression_; }
00196 
00197 void Bag::setCompression(CompressionType compression) {
00198     if (file_.isOpen() && chunk_open_)
00199         stopWritingChunk();
00200 
00201     if (!(compression == compression::Uncompressed ||
00202           compression == compression::BZ2 ||
00203           compression == compression::LZ4)) {
00204         throw BagException(
00205             (format("Unknown compression type: %i")  % compression).str());
00206     }
00207 
00208     compression_ = compression;
00209 }
00210 
00211 // Version
00212 
00213 void Bag::writeVersion() {
00214     string version = string("#ROSBAG V") + VERSION + string("\n");
00215 
00216     logDebug("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
00217 
00218     version_ = 200;
00219 
00220     write(version);
00221 }
00222 
00223 void Bag::readVersion() {
00224     string version_line = file_.getline();
00225 
00226     file_header_pos_ = file_.getOffset();
00227 
00228     char logtypename[100];
00229     int version_major, version_minor;
00230 #if defined(_MSC_VER)
00231     if (sscanf_s(version_line.c_str(), "#ROS%s V%d.%d", logtypename, sizeof(logtypename), &version_major, &version_minor) != 3)
00232 #else
00233     if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3)
00234 #endif
00235         throw BagIOException("Error reading version line");
00236 
00237     version_ = version_major * 100 + version_minor;
00238 
00239     logDebug("Read VERSION: version=%d", version_);
00240 }
00241 
00242 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
00243 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
00244 
00245 //
00246 
00247 void Bag::startWriting() {
00248     writeVersion();
00249     file_header_pos_ = file_.getOffset();
00250     writeFileHeaderRecord();
00251 }
00252 
00253 void Bag::stopWriting() {
00254     if (chunk_open_)
00255         stopWritingChunk();
00256 
00257     seek(0, std::ios::end);
00258 
00259     index_data_pos_ = file_.getOffset();
00260     writeConnectionRecords();
00261     writeChunkInfoRecords();
00262 
00263     seek(file_header_pos_);
00264     writeFileHeaderRecord();
00265 }
00266 
00267 void Bag::startReadingVersion200() {
00268     // Read the file header record, which points to the end of the chunks
00269     readFileHeaderRecord();
00270 
00271     // Seek to the end of the chunks
00272     seek(index_data_pos_);
00273 
00274     // Read the connection records (one for each connection)
00275     for (uint32_t i = 0; i < connection_count_; i++)
00276         readConnectionRecord();
00277 
00278     // Read the chunk info records
00279     for (uint32_t i = 0; i < chunk_count_; i++)
00280         readChunkInfoRecord();
00281 
00282     // Read the connection indexes for each chunk
00283     foreach(ChunkInfo const& chunk_info, chunks_) {
00284         curr_chunk_info_ = chunk_info;
00285 
00286         seek(curr_chunk_info_.pos);
00287 
00288         // Skip over the chunk data
00289         ChunkHeader chunk_header;
00290         readChunkHeader(chunk_header);
00291         seek(chunk_header.compressed_size, std::ios::cur);
00292 
00293         // Read the index records after the chunk
00294         for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
00295             readConnectionIndexRecord200();
00296     }
00297 
00298     // At this point we don't have a curr_chunk_info anymore so we reset it
00299     curr_chunk_info_ = ChunkInfo();
00300 }
00301 
00302 void Bag::startReadingVersion102() {
00303     try
00304     {
00305         // Read the file header record, which points to the start of the topic indexes
00306         readFileHeaderRecord();
00307     }
00308     catch (BagFormatException ex) {
00309         throw BagUnindexedException();
00310     }
00311 
00312     // Get the length of the file
00313     seek(0, std::ios::end);
00314     uint64_t filelength = file_.getOffset();
00315 
00316     // Seek to the beginning of the topic index records
00317     seek(index_data_pos_);
00318 
00319     // Read the topic index records, which point to the offsets of each message in the file
00320     while (file_.getOffset() < filelength)
00321         readTopicIndexRecord102();
00322 
00323     // Read the message definition records (which are the first entry in the topic indexes)
00324     for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
00325         multiset<IndexEntry> const& index       = i->second;
00326         IndexEntry const&           first_entry = *index.begin();
00327 
00328         logDebug("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
00329 
00330         seek(first_entry.chunk_pos);
00331 
00332         readMessageDefinitionRecord102();
00333     }
00334 }
00335 
00336 // File header record
00337 
00338 void Bag::writeFileHeaderRecord() {
00339     connection_count_ = connections_.size();
00340     chunk_count_      = chunks_.size();
00341 
00342     logDebug("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
00343               (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00344     
00345     // Write file header record
00346     M_string header;
00347     header[OP_FIELD_NAME]               = toHeaderString(&OP_FILE_HEADER);
00348     header[INDEX_POS_FIELD_NAME]        = toHeaderString(&index_data_pos_);
00349     header[CONNECTION_COUNT_FIELD_NAME] = toHeaderString(&connection_count_);
00350     header[CHUNK_COUNT_FIELD_NAME]      = toHeaderString(&chunk_count_);
00351 
00352     boost::shared_array<uint8_t> header_buffer;
00353     uint32_t header_len;
00354     ros::Header::write(header, header_buffer, header_len);
00355     uint32_t data_len = 0;
00356     if (header_len < FILE_HEADER_LENGTH)
00357         data_len = FILE_HEADER_LENGTH - header_len;
00358     write((char*) &header_len, 4);
00359     write((char*) header_buffer.get(), header_len);
00360     write((char*) &data_len, 4);
00361     
00362     // Pad the file header record out
00363     if (data_len > 0) {
00364         string padding;
00365         padding.resize(data_len, ' ');
00366         write(padding);
00367     }
00368 }
00369 
00370 void Bag::readFileHeaderRecord() {
00371     ros::Header header;
00372     uint32_t data_size;
00373     if (!readHeader(header) || !readDataLength(data_size))
00374         throw BagFormatException("Error reading FILE_HEADER record");
00375 
00376     M_string& fields = *header.getValues();
00377 
00378     if (!isOp(fields, OP_FILE_HEADER))
00379         throw BagFormatException("Expected FILE_HEADER op not found");
00380 
00381     // Read index position
00382     readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
00383 
00384     if (index_data_pos_ == 0)
00385         throw BagUnindexedException();
00386 
00387     // Read topic and chunks count
00388     if (version_ >= 200) {
00389         readField(fields, CONNECTION_COUNT_FIELD_NAME, true, &connection_count_);
00390         readField(fields, CHUNK_COUNT_FIELD_NAME,      true, &chunk_count_);
00391     }
00392 
00393     logDebug("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
00394               (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00395 
00396     // Skip the data section (just padding)
00397     seek(data_size, std::ios::cur);
00398 }
00399 
00400 uint32_t Bag::getChunkOffset() const {
00401     if (compression_ == compression::Uncompressed)
00402         return file_.getOffset() - curr_chunk_data_pos_;
00403     else
00404         return file_.getCompressedBytesIn();
00405 }
00406 
00407 void Bag::startWritingChunk(Time time) {
00408     // Initialize chunk info
00409     curr_chunk_info_.pos        = file_.getOffset();
00410     curr_chunk_info_.start_time = time;
00411     curr_chunk_info_.end_time   = time;
00412 
00413     // Write the chunk header, with a place-holder for the data sizes (we'll fill in when the chunk is finished)
00414     writeChunkHeader(compression_, 0, 0);
00415 
00416     // Turn on compressed writing
00417     file_.setWriteMode(compression_);
00418     
00419     // Record where the data section of this chunk started
00420     curr_chunk_data_pos_ = file_.getOffset();
00421 
00422     chunk_open_ = true;
00423 }
00424 
00425 void Bag::stopWritingChunk() {
00426     // Add this chunk to the index
00427     chunks_.push_back(curr_chunk_info_);
00428     
00429     // Get the uncompressed and compressed sizes
00430     uint32_t uncompressed_size = getChunkOffset();
00431     file_.setWriteMode(compression::Uncompressed);
00432     uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_;
00433 
00434     // Rewrite the chunk header with the size of the chunk (remembering current offset)
00435     uint64_t end_of_chunk_pos = file_.getOffset();
00436 
00437     seek(curr_chunk_info_.pos);
00438     writeChunkHeader(compression_, compressed_size, uncompressed_size);
00439 
00440     // Write out the indexes and clear them
00441     seek(end_of_chunk_pos);
00442     writeIndexRecords();
00443     curr_chunk_connection_indexes_.clear();
00444 
00445     // Clear the connection counts
00446     curr_chunk_info_.connection_counts.clear();
00447     
00448     // Flag that we're starting a new chunk
00449     chunk_open_ = false;
00450 }
00451 
00452 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
00453     ChunkHeader chunk_header;
00454     switch (compression) {
00455     case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
00456     case compression::BZ2:          chunk_header.compression = COMPRESSION_BZ2;  break;
00457     case compression::LZ4:          chunk_header.compression = COMPRESSION_LZ4;
00458     //case compression::ZLIB:         chunk_header.compression = COMPRESSION_ZLIB; break;
00459     }
00460     chunk_header.compressed_size   = compressed_size;
00461     chunk_header.uncompressed_size = uncompressed_size;
00462 
00463     logDebug("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
00464               (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
00465 
00466     M_string header;
00467     header[OP_FIELD_NAME]          = toHeaderString(&OP_CHUNK);
00468     header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
00469     header[SIZE_FIELD_NAME]        = toHeaderString(&chunk_header.uncompressed_size);
00470     writeHeader(header);
00471 
00472     writeDataLength(chunk_header.compressed_size);
00473 }
00474 
00475 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
00476     ros::Header header;
00477     if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
00478         throw BagFormatException("Error reading CHUNK record");
00479         
00480     M_string& fields = *header.getValues();
00481 
00482     if (!isOp(fields, OP_CHUNK))
00483         throw BagFormatException("Expected CHUNK op not found");
00484 
00485     readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
00486     readField(fields, SIZE_FIELD_NAME,        true, &chunk_header.uncompressed_size);
00487 
00488     logDebug("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
00489 }
00490 
00491 // Index records
00492 
00493 void Bag::writeIndexRecords() {
00494     for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
00495         uint32_t                    connection_id = i->first;
00496         multiset<IndexEntry> const& index         = i->second;
00497 
00498         // Write the index record header
00499         uint32_t index_size = index.size();
00500         M_string header;
00501         header[OP_FIELD_NAME]         = toHeaderString(&OP_INDEX_DATA);
00502         header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
00503         header[VER_FIELD_NAME]        = toHeaderString(&INDEX_VERSION);
00504         header[COUNT_FIELD_NAME]      = toHeaderString(&index_size);
00505         writeHeader(header);
00506 
00507         writeDataLength(index_size * 12);
00508 
00509         logDebug("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
00510 
00511         // Write the index record data (pairs of timestamp and position in file)
00512         foreach(IndexEntry const& e, index) {
00513             write((char*) &e.time.sec,  4);
00514             write((char*) &e.time.nsec, 4);
00515             write((char*) &e.offset,    4);
00516 
00517             logDebug("  - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
00518         }
00519     }
00520 }
00521 
00522 void Bag::readTopicIndexRecord102() {
00523     ros::Header header;
00524     uint32_t data_size;
00525     if (!readHeader(header) || !readDataLength(data_size))
00526         throw BagFormatException("Error reading INDEX_DATA header");
00527     M_string& fields = *header.getValues();
00528 
00529     if (!isOp(fields, OP_INDEX_DATA))
00530         throw BagFormatException("Expected INDEX_DATA record");
00531 
00532     uint32_t index_version;
00533     string topic;
00534     uint32_t count = 0;
00535     readField(fields, VER_FIELD_NAME,   true, &index_version);
00536     readField(fields, TOPIC_FIELD_NAME, true, topic);
00537     readField(fields, COUNT_FIELD_NAME, true, &count);
00538 
00539     logDebug("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
00540 
00541     if (index_version != 0)
00542         throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00543 
00544     uint32_t connection_id;
00545     map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00546     if (topic_conn_id_iter == topic_connection_ids_.end()) {
00547         connection_id = connections_.size();
00548 
00549         logDebug("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
00550         ConnectionInfo* connection_info = new ConnectionInfo();
00551         connection_info->id       = connection_id;
00552         connection_info->topic    = topic;
00553         connections_[connection_id] = connection_info;
00554 
00555         topic_connection_ids_[topic] = connection_id;
00556     }
00557     else
00558         connection_id = topic_conn_id_iter->second;
00559 
00560     multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00561 
00562     for (uint32_t i = 0; i < count; i++) {
00563         IndexEntry index_entry;
00564         uint32_t sec;
00565         uint32_t nsec;
00566         read((char*) &sec,                   4);
00567         read((char*) &nsec,                  4);
00568         read((char*) &index_entry.chunk_pos, 8);   //<! store position of the message in the chunk_pos field as it's 64 bits
00569         index_entry.time = Time(sec, nsec);
00570         index_entry.offset = 0;
00571 
00572         logDebug("  - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
00573 
00574         if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00575         {
00576           logError("Index entry for topic %s contains invalid time.", topic.c_str());
00577         } else
00578         {
00579           connection_index.insert(connection_index.end(), index_entry);
00580         }
00581     }
00582 }
00583 
00584 void Bag::readConnectionIndexRecord200() {
00585     ros::Header header;
00586     uint32_t data_size;
00587     if (!readHeader(header) || !readDataLength(data_size))
00588         throw BagFormatException("Error reading INDEX_DATA header");
00589     M_string& fields = *header.getValues();
00590     
00591     if (!isOp(fields, OP_INDEX_DATA))
00592         throw BagFormatException("Expected INDEX_DATA record");
00593     
00594     uint32_t index_version;
00595     uint32_t connection_id;
00596     uint32_t count = 0;
00597     readField(fields, VER_FIELD_NAME,        true, &index_version);
00598     readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
00599     readField(fields, COUNT_FIELD_NAME,      true, &count);
00600 
00601     logDebug("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
00602 
00603     if (index_version != 1)
00604         throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00605 
00606     uint64_t chunk_pos = curr_chunk_info_.pos;
00607 
00608     multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00609 
00610     for (uint32_t i = 0; i < count; i++) {
00611         IndexEntry index_entry;
00612         index_entry.chunk_pos = chunk_pos;
00613         uint32_t sec;
00614         uint32_t nsec;
00615         read((char*) &sec,                4);
00616         read((char*) &nsec,               4);
00617         read((char*) &index_entry.offset, 4);
00618         index_entry.time = Time(sec, nsec);
00619 
00620         logDebug("  - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
00621 
00622         if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00623         {
00624           logError("Index entry for topic %s contains invalid time.  This message will not be loaded.", connections_[connection_id]->topic.c_str());
00625         } else
00626         {
00627           connection_index.insert(connection_index.end(), index_entry);
00628         }
00629     }
00630 }
00631 
00632 // Connection records
00633 
00634 void Bag::writeConnectionRecords() {
00635     for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
00636         ConnectionInfo const* connection_info = i->second;
00637         writeConnectionRecord(connection_info);
00638     }
00639 }
00640 
00641 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) {
00642     logDebug("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
00643               (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
00644 
00645     M_string header;
00646     header[OP_FIELD_NAME]         = toHeaderString(&OP_CONNECTION);
00647     header[TOPIC_FIELD_NAME]      = connection_info->topic;
00648     header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00649     writeHeader(header);
00650 
00651     writeHeader(*connection_info->header);
00652 }
00653 
00654 void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) {
00655     M_string header;
00656     header[OP_FIELD_NAME]         = toHeaderString(&OP_CONNECTION);
00657     header[TOPIC_FIELD_NAME]      = connection_info->topic;
00658     header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00659     appendHeaderToBuffer(buf, header);
00660 
00661     appendHeaderToBuffer(buf, *connection_info->header);
00662 }
00663 
00664 void Bag::readConnectionRecord() {
00665     ros::Header header;
00666     if (!readHeader(header))
00667         throw BagFormatException("Error reading CONNECTION header");
00668     M_string& fields = *header.getValues();
00669 
00670     if (!isOp(fields, OP_CONNECTION))
00671         throw BagFormatException("Expected CONNECTION op not found");
00672 
00673     uint32_t id;
00674     readField(fields, CONNECTION_FIELD_NAME, true, &id);
00675     string topic;
00676     readField(fields, TOPIC_FIELD_NAME,      true, topic);
00677 
00678     ros::Header connection_header;
00679     if (!readHeader(connection_header))
00680         throw BagFormatException("Error reading connection header");
00681 
00682     // If this is a new connection, update connections
00683     map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
00684     if (key == connections_.end()) {
00685         ConnectionInfo* connection_info = new ConnectionInfo();
00686         connection_info->id       = id;
00687         connection_info->topic    = topic;
00688         connection_info->header = boost::make_shared<M_string>();
00689         for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
00690             (*connection_info->header)[i->first] = i->second;
00691         connection_info->msg_def  = (*connection_info->header)["message_definition"];
00692         connection_info->datatype = (*connection_info->header)["type"];
00693         connection_info->md5sum   = (*connection_info->header)["md5sum"];
00694         connections_[id] = connection_info;
00695 
00696         logDebug("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
00697     }
00698 }
00699 
00700 void Bag::readMessageDefinitionRecord102() {
00701     ros::Header header;
00702     uint32_t data_size;
00703     if (!readHeader(header) || !readDataLength(data_size))
00704         throw BagFormatException("Error reading message definition header");
00705     M_string& fields = *header.getValues();
00706 
00707     if (!isOp(fields, OP_MSG_DEF))
00708         throw BagFormatException("Expected MSG_DEF op not found");
00709 
00710     string topic, md5sum, datatype, message_definition;
00711     readField(fields, TOPIC_FIELD_NAME,               true, topic);
00712     readField(fields, MD5_FIELD_NAME,   32,       32, true, md5sum);
00713     readField(fields, TYPE_FIELD_NAME,                true, datatype);
00714     readField(fields, DEF_FIELD_NAME,    0, UINT_MAX, true, message_definition);
00715 
00716     ConnectionInfo* connection_info;
00717 
00718     map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00719     if (topic_conn_id_iter == topic_connection_ids_.end()) {
00720         uint32_t id = connections_.size();
00721 
00722         logDebug("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00723         connection_info = new ConnectionInfo();
00724         connection_info->id       = id;
00725         connection_info->topic    = topic;
00726 
00727         connections_[id] = connection_info;
00728         topic_connection_ids_[topic] = id;
00729     }
00730     else
00731         connection_info = connections_[topic_conn_id_iter->second];
00732 
00733     connection_info->msg_def  = message_definition;
00734     connection_info->datatype = datatype;
00735     connection_info->md5sum   = md5sum;
00736     connection_info->header = boost::make_shared<ros::M_string>();
00737     (*connection_info->header)["type"]               = connection_info->datatype;
00738     (*connection_info->header)["md5sum"]             = connection_info->md5sum;
00739     (*connection_info->header)["message_definition"] = connection_info->msg_def;
00740 
00741     logDebug("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00742 }
00743 
00744 void Bag::decompressChunk(uint64_t chunk_pos) const {
00745     if (curr_chunk_info_.pos == chunk_pos) {
00746         current_buffer_ = &outgoing_chunk_buffer_;
00747         return;
00748     }
00749 
00750     current_buffer_ = &decompress_buffer_;
00751 
00752     if (decompressed_chunk_ == chunk_pos)
00753         return;
00754 
00755     // Seek to the start of the chunk
00756     seek(chunk_pos);
00757 
00758     // Read the chunk header
00759     ChunkHeader chunk_header;
00760     readChunkHeader(chunk_header);
00761 
00762     // Read and decompress the chunk.  These assume we are at the right place in the stream already
00763     if (chunk_header.compression == COMPRESSION_NONE)
00764         decompressRawChunk(chunk_header);
00765     else if (chunk_header.compression == COMPRESSION_BZ2)
00766         decompressBz2Chunk(chunk_header);
00767     else if (chunk_header.compression == COMPRESSION_LZ4)
00768         decompressLz4Chunk(chunk_header);
00769     else
00770         throw BagFormatException("Unknown compression: " + chunk_header.compression);
00771     
00772     decompressed_chunk_ = chunk_pos;
00773 }
00774 
00775 void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const {
00776     logDebug("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
00777 
00778     seek(offset);
00779 
00780     uint32_t data_size;
00781     uint8_t op;
00782     do {
00783         if (!readHeader(header) || !readDataLength(data_size))
00784             throw BagFormatException("Error reading header");
00785 
00786         readField(*header.getValues(), OP_FIELD_NAME, true, &op);
00787     }
00788     while (op == OP_MSG_DEF);
00789 
00790     if (op != OP_MSG_DATA)
00791         throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str());
00792 
00793     record_buffer_.setSize(data_size);
00794     file_.read((char*) record_buffer_.getData(), data_size);
00795 }
00796 
00797 // Reading this into a buffer isn't completely necessary, but we do it anyways for now
00798 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
00799     assert(chunk_header.compression == COMPRESSION_NONE);
00800     assert(chunk_header.compressed_size == chunk_header.uncompressed_size);
00801 
00802     logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00803 
00804     decompress_buffer_.setSize(chunk_header.compressed_size);
00805     file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size);
00806 
00807     // todo check read was successful
00808 }
00809 
00810 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
00811     assert(chunk_header.compression == COMPRESSION_BZ2);
00812 
00813     CompressionType compression = compression::BZ2;
00814 
00815     logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00816 
00817     chunk_buffer_.setSize(chunk_header.compressed_size);
00818     file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
00819 
00820     decompress_buffer_.setSize(chunk_header.uncompressed_size);
00821     file_.decompress(compression, decompress_buffer_.getData(), decompress_buffer_.getSize(), chunk_buffer_.getData(), chunk_buffer_.getSize());
00822 
00823     // todo check read was successful
00824 }
00825 
00826 void Bag::decompressLz4Chunk(ChunkHeader const& chunk_header) const {
00827     assert(chunk_header.compression == COMPRESSION_LZ4);
00828 
00829     CompressionType compression = compression::LZ4;
00830 
00831     logDebug("lz4 compressed_size: %d uncompressed_size: %d",
00832              chunk_header.compressed_size, chunk_header.uncompressed_size);
00833 
00834     chunk_buffer_.setSize(chunk_header.compressed_size);
00835     file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
00836 
00837     decompress_buffer_.setSize(chunk_header.uncompressed_size);
00838     file_.decompress(compression, decompress_buffer_.getData(), decompress_buffer_.getSize(), chunk_buffer_.getData(), chunk_buffer_.getSize());
00839 
00840     // todo check read was successful
00841 }
00842 
00843 ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) {
00844     ros::Header header;
00845     uint32_t data_size;
00846     uint32_t bytes_read;
00847     switch (version_)
00848     {
00849     case 200:
00850         decompressChunk(index_entry.chunk_pos);
00851         readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00852         return header;
00853     case 102:
00854         readMessageDataRecord102(index_entry.chunk_pos, header);
00855         return header;
00856     default:
00857         throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00858     }
00859 }
00860 
00861 // NOTE: this loads the header, which is unnecessary
00862 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
00863     ros::Header header;
00864     uint32_t data_size;
00865     uint32_t bytes_read;
00866     switch (version_)
00867     {
00868     case 200:
00869         decompressChunk(index_entry.chunk_pos);
00870         readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00871         return data_size;
00872     case 102:
00873         readMessageDataRecord102(index_entry.chunk_pos, header);
00874         return record_buffer_.getSize();
00875     default:
00876         throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00877     }
00878 }
00879 
00880 void Bag::writeChunkInfoRecords() {
00881     foreach(ChunkInfo const& chunk_info, chunks_) {
00882         // Write the chunk info header
00883         M_string header;
00884         uint32_t chunk_connection_count = chunk_info.connection_counts.size();
00885         header[OP_FIELD_NAME]         = toHeaderString(&OP_CHUNK_INFO);
00886         header[VER_FIELD_NAME]        = toHeaderString(&CHUNK_INFO_VERSION);
00887         header[CHUNK_POS_FIELD_NAME]  = toHeaderString(&chunk_info.pos);
00888         header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
00889         header[END_TIME_FIELD_NAME]   = toHeaderString(&chunk_info.end_time);
00890         header[COUNT_FIELD_NAME]      = toHeaderString(&chunk_connection_count);
00891 
00892         logDebug("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
00893                   (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
00894                   chunk_info.start_time.sec, chunk_info.start_time.nsec,
00895                   chunk_info.end_time.sec, chunk_info.end_time.nsec);
00896 
00897         writeHeader(header);
00898 
00899         writeDataLength(8 * chunk_connection_count);
00900 
00901         // Write the topic names and counts
00902         for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
00903             uint32_t connection_id = i->first;
00904             uint32_t count         = i->second;
00905 
00906             write((char*) &connection_id, 4);
00907             write((char*) &count, 4);
00908 
00909             logDebug("  - %d: %d", connection_id, count);
00910         }
00911     }
00912 }
00913 
00914 void Bag::readChunkInfoRecord() {
00915     // Read a CHUNK_INFO header
00916     ros::Header header;
00917     uint32_t data_size;
00918     if (!readHeader(header) || !readDataLength(data_size))
00919         throw BagFormatException("Error reading CHUNK_INFO record header");
00920     M_string& fields = *header.getValues();
00921     if (!isOp(fields, OP_CHUNK_INFO))
00922         throw BagFormatException("Expected CHUNK_INFO op not found");
00923 
00924     // Check that the chunk info version is current
00925     uint32_t chunk_info_version;
00926     readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
00927     if (chunk_info_version != CHUNK_INFO_VERSION)
00928         throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str());
00929 
00930     // Read the chunk position, timestamp, and topic count fields
00931     ChunkInfo chunk_info;
00932     readField(fields, CHUNK_POS_FIELD_NAME,  true, &chunk_info.pos);
00933     readField(fields, START_TIME_FIELD_NAME, true,  chunk_info.start_time);
00934     readField(fields, END_TIME_FIELD_NAME,   true,  chunk_info.end_time);
00935     uint32_t chunk_connection_count = 0;
00936     readField(fields, COUNT_FIELD_NAME,      true, &chunk_connection_count);
00937 
00938     logDebug("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
00939               (unsigned long long) chunk_info.pos, chunk_connection_count,
00940               chunk_info.start_time.sec, chunk_info.start_time.nsec,
00941               chunk_info.end_time.sec, chunk_info.end_time.nsec);
00942 
00943     // Read the topic count entries
00944     for (uint32_t i = 0; i < chunk_connection_count; i ++) {
00945         uint32_t connection_id, connection_count;
00946         read((char*) &connection_id,    4);
00947         read((char*) &connection_count, 4);
00948 
00949         logDebug("  %d: %d messages", connection_id, connection_count);
00950 
00951         chunk_info.connection_counts[connection_id] = connection_count;
00952     }
00953 
00954     chunks_.push_back(chunk_info);
00955 }
00956 
00957 // Record I/O
00958 
00959 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
00960     uint8_t op = 0xFF; // nonexistent op
00961     readField(fields, OP_FIELD_NAME, true, &op);
00962     return op == reqOp;
00963 }
00964 
00965 void Bag::writeHeader(M_string const& fields) {
00966     boost::shared_array<uint8_t> header_buffer;
00967     uint32_t header_len;
00968     ros::Header::write(fields, header_buffer, header_len);
00969     write((char*) &header_len, 4);
00970     write((char*) header_buffer.get(), header_len);
00971 }
00972 
00973 void Bag::writeDataLength(uint32_t data_len) {
00974     write((char*) &data_len, 4);
00975 }
00976 
00977 void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) {
00978     boost::shared_array<uint8_t> header_buffer;
00979     uint32_t header_len;
00980     ros::Header::write(fields, header_buffer, header_len);
00981 
00982     uint32_t offset = buf.getSize();
00983 
00984     buf.setSize(buf.getSize() + 4 + header_len);
00985 
00986     memcpy(buf.getData() + offset, &header_len, 4);
00987     offset += 4;
00988     memcpy(buf.getData() + offset, header_buffer.get(), header_len);
00989 }
00990 
00991 void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) {
00992     uint32_t offset = buf.getSize();
00993 
00994     buf.setSize(buf.getSize() + 4);
00995 
00996     memcpy(buf.getData() + offset, &data_len, 4);
00997 }
00998 
01000 void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const {
01001     assert(buffer.getSize() > 8);
01002 
01003     uint8_t* start = (uint8_t*) buffer.getData() + offset;
01004 
01005     uint8_t* ptr = start;
01006 
01007     // Read the header length
01008     uint32_t header_len;
01009     memcpy(&header_len, ptr, 4);
01010     ptr += 4;
01011 
01012     // Parse the header
01013     string error_msg;
01014     bool parsed = header.parse(ptr, header_len, error_msg);
01015     if (!parsed)
01016         throw BagFormatException("Error parsing header");
01017     ptr += header_len;
01018 
01019     // Read the data size
01020     memcpy(&data_size, ptr, 4);
01021     ptr += 4;
01022 
01023     bytes_read = ptr - start;
01024 }
01025 
01026 void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) const {
01027     (void)buffer;
01028     total_bytes_read = 0;
01029     uint8_t op = 0xFF;
01030     do {
01031         logDebug("reading header from buffer: offset=%d", offset);
01032         uint32_t bytes_read;
01033         readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
01034 
01035         offset += bytes_read;
01036         total_bytes_read += bytes_read;
01037         
01038         readField(*header.getValues(), OP_FIELD_NAME, true, &op);
01039     }
01040     while (op == OP_MSG_DEF || op == OP_CONNECTION);
01041 
01042     if (op != OP_MSG_DATA)
01043         throw BagFormatException("Expected MSG_DATA op not found");
01044 }
01045 
01046 bool Bag::readHeader(ros::Header& header) const {
01047     // Read the header length
01048     uint32_t header_len;
01049     read((char*) &header_len, 4);
01050 
01051     // Read the header
01052     header_buffer_.setSize(header_len);
01053     read((char*) header_buffer_.getData(), header_len);
01054 
01055     // Parse the header
01056     string error_msg;
01057     bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
01058     if (!parsed)
01059         return false;
01060 
01061     return true;
01062 }
01063 
01064 bool Bag::readDataLength(uint32_t& data_size) const {
01065     read((char*) &data_size, 4);
01066     return true;
01067 }
01068 
01069 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
01070     M_string::const_iterator fitr = fields.find(field);
01071     if (fitr == fields.end()) {
01072         if (required)
01073             throw BagFormatException("Required '" + field + "' field missing");
01074     }
01075     else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
01076         throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
01077 
01078     return fitr;
01079 }
01080 
01081 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
01082     return readField(fields, field_name, 1, UINT_MAX, required, data);
01083 }
01084 
01085 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
01086     M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
01087     if (fitr == fields.end())
01088         return false;
01089 
01090     data = fitr->second;
01091     return true;
01092 }
01093 
01094 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
01095     uint64_t packed_time;
01096     if (!readField(fields, field_name, required, &packed_time))
01097         return false;
01098 
01099     uint64_t bitmask = (1LL << 33) - 1;
01100     data.sec  = (uint32_t) (packed_time & bitmask);
01101     data.nsec = (uint32_t) (packed_time >> 32);
01102 
01103     return true;
01104 }
01105 
01106 std::string Bag::toHeaderString(Time const* field) const {
01107     uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
01108     return toHeaderString(&packed_time);
01109 }
01110 
01111 
01112 // Low-level I/O
01113 
01114 void Bag::write(string const& s)                  { write(s.c_str(), s.length()); }
01115 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n);    }
01116 
01117 void Bag::read(char* b, std::streamsize n) const  { file_.read(b, n);             }
01118 void Bag::seek(uint64_t pos, int origin) const    { file_.seek(pos, origin);      }
01119 
01120 } // namespace rosbag


rosbag_storage
Author(s):
autogenerated on Tue Mar 7 2017 03:44:35