bag.cpp
Go to the documentation of this file.
00001 // Copyright (c) 2009, Willow Garage, Inc.
00002 // All rights reserved.
00003 // 
00004 // Redistribution and use in source and binary forms, with or without
00005 // modification, are permitted provided that the following conditions are met:
00006 // 
00007 //     * Redistributions of source code must retain the above copyright
00008 //       notice, this list of conditions and the following disclaimer.
00009 //     * Redistributions in binary form must reproduce the above copyright
00010 //       notice, this list of conditions and the following disclaimer in the
00011 //       documentation and/or other materials provided with the distribution.
00012 //     * Neither the name of Willow Garage, Inc. nor the names of its
00013 //       contributors may be used to endorse or promote products derived from
00014 //       this software without specific prior written permission.
00015 // 
00016 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00019 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00020 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00021 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00022 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00023 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00024 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00025 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00026 // POSSIBILITY OF SUCH DAMAGE.
00027 
00028 #include "rosbag/bag.h"
00029 #include "rosbag/message_instance.h"
00030 #include "rosbag/query.h"
00031 #include "rosbag/view.h"
00032 
00033 #if defined(_MSC_VER)
00034   #include <stdint.h> // only on v2010 and later -> is this enough for msvc and linux?
00035 #else
00036   #include <inttypes.h>
00037 #endif
00038 #include <signal.h>
00039 #include <assert.h>
00040 #include <iomanip>
00041 
00042 #include <boost/foreach.hpp>
00043 
00044 #include "console_bridge/console.h"
00045 
00046 #define foreach BOOST_FOREACH
00047 
00048 using std::map;
00049 using std::priority_queue;
00050 using std::string;
00051 using std::vector;
00052 using std::multiset;
00053 using boost::format;
00054 using boost::shared_ptr;
00055 using ros::M_string;
00056 using ros::Time;
00057 
00058 namespace rosbag {
00059 
00060 Bag::Bag() :
00061     compression_(compression::Uncompressed),
00062     chunk_threshold_(768 * 1024),  // 768KB chunks
00063     bag_revision_(0),
00064     file_size_(0),
00065     file_header_pos_(0),
00066     index_data_pos_(0),
00067     connection_count_(0),
00068     chunk_count_(0),
00069     chunk_open_(false),
00070     curr_chunk_data_pos_(0),
00071     decompressed_chunk_(0)
00072 {
00073 }
00074 
00075 Bag::Bag(string const& filename, uint32_t mode) :
00076     compression_(compression::Uncompressed),
00077     chunk_threshold_(768 * 1024),  // 768KB chunks
00078     bag_revision_(0),
00079     file_size_(0),
00080     file_header_pos_(0),
00081     index_data_pos_(0),
00082     connection_count_(0),
00083     chunk_count_(0),
00084     chunk_open_(false),
00085     curr_chunk_data_pos_(0),
00086     decompressed_chunk_(0)
00087 {
00088     open(filename, mode);
00089 }
00090 
00091 Bag::~Bag() {
00092     close();
00093 }
00094 
00095 void Bag::open(string const& filename, uint32_t mode) {
00096     mode_ = (BagMode) mode;
00097 
00098     if (mode_ & bagmode::Append)
00099         openAppend(filename);
00100     else if (mode_ & bagmode::Write)
00101         openWrite(filename);
00102     else if (mode_ & bagmode::Read)
00103         openRead(filename);
00104     else
00105         throw BagException((format("Unknown mode: %1%") % (int) mode).str());
00106 
00107     // Determine file size
00108     uint64_t offset = file_.getOffset();
00109     seek(0, std::ios::end);
00110     file_size_ = file_.getOffset();
00111     seek(offset);
00112 }
00113 
00114 void Bag::openRead(string const& filename) {
00115     file_.openRead(filename);
00116 
00117     readVersion();
00118 
00119     switch (version_) {
00120     case 102: startReadingVersion102(); break;
00121     case 200: startReadingVersion200(); break;
00122     default:
00123         throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str());
00124     }
00125 }
00126 
00127 void Bag::openWrite(string const& filename) {
00128     file_.openWrite(filename);
00129 
00130     startWriting();
00131 }
00132 
00133 void Bag::openAppend(string const& filename) {
00134     file_.openReadWrite(filename);
00135 
00136     readVersion();
00137 
00138     if (version_ != 200)
00139         throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str());
00140 
00141     startReadingVersion200();
00142 
00143     // Truncate the file to chop off the index
00144     file_.truncate(index_data_pos_);
00145     index_data_pos_ = 0;
00146 
00147     // Rewrite the file header, clearing the index position (so we know if the index is invalid)
00148     seek(file_header_pos_);
00149     writeFileHeaderRecord();
00150 
00151     // Seek to the end of the file
00152     seek(0, std::ios::end);
00153 }
00154 
00155 void Bag::close() {
00156     if (!file_.isOpen())
00157         return;
00158 
00159     if (mode_ & bagmode::Write || mode_ & bagmode::Append)
00160         closeWrite();
00161     
00162     file_.close();
00163 
00164     topic_connection_ids_.clear();
00165     header_connection_ids_.clear();
00166     for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
00167         delete i->second;
00168     connections_.clear();
00169     chunks_.clear();
00170     connection_indexes_.clear();
00171     curr_chunk_connection_indexes_.clear();
00172 }
00173 
00174 void Bag::closeWrite() {
00175     stopWriting();
00176 }
00177 
00178 string   Bag::getFileName() const { return file_.getFileName(); }
00179 BagMode  Bag::getMode()     const { return mode_;               }
00180 uint64_t Bag::getSize()     const { return file_size_;          }
00181 
00182 uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; }
00183 
00184 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
00185     if (file_.isOpen() && chunk_open_)
00186         stopWritingChunk();
00187 
00188     chunk_threshold_ = chunk_threshold;
00189 }
00190 
00191 CompressionType Bag::getCompression() const { return compression_; }
00192 
00193 void Bag::setCompression(CompressionType compression) {
00194     if (file_.isOpen() && chunk_open_)
00195         stopWritingChunk();
00196 
00197     compression_ = compression;
00198 }
00199 
00200 // Version
00201 
00202 void Bag::writeVersion() {
00203     string version = string("#ROSBAG V") + VERSION + string("\n");
00204 
00205     logDebug("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
00206 
00207     version_ = 200;
00208 
00209     write(version);
00210 }
00211 
00212 void Bag::readVersion() {
00213     string version_line = file_.getline();
00214 
00215     file_header_pos_ = file_.getOffset();
00216 
00217     char logtypename[100];
00218     int version_major, version_minor;
00219 #if defined(_MSC_VER)
00220     if (sscanf_s(version_line.c_str(), "#ROS%s V%d.%d", logtypename, sizeof(logtypename), &version_major, &version_minor) != 3)
00221 #else
00222     if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3)
00223 #endif
00224         throw BagIOException("Error reading version line");
00225 
00226     version_ = version_major * 100 + version_minor;
00227 
00228     logDebug("Read VERSION: version=%d", version_);
00229 }
00230 
00231 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
00232 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
00233 
00234 //
00235 
00236 void Bag::startWriting() {
00237     writeVersion();
00238     file_header_pos_ = file_.getOffset();
00239     writeFileHeaderRecord();
00240 }
00241 
00242 void Bag::stopWriting() {
00243     if (chunk_open_)
00244         stopWritingChunk();
00245 
00246     seek(0, std::ios::end);
00247 
00248     index_data_pos_ = file_.getOffset();
00249     writeConnectionRecords();
00250     writeChunkInfoRecords();
00251 
00252     seek(file_header_pos_);
00253     writeFileHeaderRecord();
00254 }
00255 
00256 void Bag::startReadingVersion200() {
00257     // Read the file header record, which points to the end of the chunks
00258     readFileHeaderRecord();
00259 
00260     // Seek to the end of the chunks
00261     seek(index_data_pos_);
00262 
00263     // Read the connection records (one for each connection)
00264     for (uint32_t i = 0; i < connection_count_; i++)
00265         readConnectionRecord();
00266 
00267     // Read the chunk info records
00268     for (uint32_t i = 0; i < chunk_count_; i++)
00269         readChunkInfoRecord();
00270 
00271     // Read the connection indexes for each chunk
00272     foreach(ChunkInfo const& chunk_info, chunks_) {
00273         curr_chunk_info_ = chunk_info;
00274 
00275         seek(curr_chunk_info_.pos);
00276 
00277         // Skip over the chunk data
00278         ChunkHeader chunk_header;
00279         readChunkHeader(chunk_header);
00280         seek(chunk_header.compressed_size, std::ios::cur);
00281 
00282         // Read the index records after the chunk
00283         for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
00284             readConnectionIndexRecord200();
00285     }
00286 
00287     // At this point we don't have a curr_chunk_info anymore so we reset it
00288     curr_chunk_info_ = ChunkInfo();
00289 }
00290 
00291 void Bag::startReadingVersion102() {
00292     try
00293     {
00294         // Read the file header record, which points to the start of the topic indexes
00295         readFileHeaderRecord();
00296     }
00297     catch (BagFormatException ex) {
00298         throw BagUnindexedException();
00299     }
00300 
00301     // Get the length of the file
00302     seek(0, std::ios::end);
00303     uint64_t filelength = file_.getOffset();
00304 
00305     // Seek to the beginning of the topic index records
00306     seek(index_data_pos_);
00307 
00308     // Read the topic index records, which point to the offsets of each message in the file
00309     while (file_.getOffset() < filelength)
00310         readTopicIndexRecord102();
00311 
00312     // Read the message definition records (which are the first entry in the topic indexes)
00313     for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
00314         multiset<IndexEntry> const& index       = i->second;
00315         IndexEntry const&           first_entry = *index.begin();
00316 
00317         logDebug("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
00318 
00319         seek(first_entry.chunk_pos);
00320 
00321         readMessageDefinitionRecord102();
00322     }
00323 }
00324 
00325 // File header record
00326 
00327 void Bag::writeFileHeaderRecord() {
00328     connection_count_ = connections_.size();
00329     chunk_count_      = chunks_.size();
00330 
00331     logDebug("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
00332               (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00333     
00334     // Write file header record
00335     M_string header;
00336     header[OP_FIELD_NAME]               = toHeaderString(&OP_FILE_HEADER);
00337     header[INDEX_POS_FIELD_NAME]        = toHeaderString(&index_data_pos_);
00338     header[CONNECTION_COUNT_FIELD_NAME] = toHeaderString(&connection_count_);
00339     header[CHUNK_COUNT_FIELD_NAME]      = toHeaderString(&chunk_count_);
00340 
00341     boost::shared_array<uint8_t> header_buffer;
00342     uint32_t header_len;
00343     ros::Header::write(header, header_buffer, header_len);
00344     uint32_t data_len = 0;
00345     if (header_len < FILE_HEADER_LENGTH)
00346         data_len = FILE_HEADER_LENGTH - header_len;
00347     write((char*) &header_len, 4);
00348     write((char*) header_buffer.get(), header_len);
00349     write((char*) &data_len, 4);
00350     
00351     // Pad the file header record out
00352     if (data_len > 0) {
00353         string padding;
00354         padding.resize(data_len, ' ');
00355         write(padding);
00356     }
00357 }
00358 
00359 void Bag::readFileHeaderRecord() {
00360     ros::Header header;
00361     uint32_t data_size;
00362     if (!readHeader(header) || !readDataLength(data_size))
00363         throw BagFormatException("Error reading FILE_HEADER record");
00364 
00365     M_string& fields = *header.getValues();
00366 
00367     if (!isOp(fields, OP_FILE_HEADER))
00368         throw BagFormatException("Expected FILE_HEADER op not found");
00369 
00370     // Read index position
00371     readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
00372 
00373     if (index_data_pos_ == 0)
00374         throw BagUnindexedException();
00375 
00376     // Read topic and chunks count
00377     if (version_ >= 200) {
00378         readField(fields, CONNECTION_COUNT_FIELD_NAME, true, &connection_count_);
00379         readField(fields, CHUNK_COUNT_FIELD_NAME,      true, &chunk_count_);
00380     }
00381 
00382     logDebug("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
00383               (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00384 
00385     // Skip the data section (just padding)
00386     seek(data_size, std::ios::cur);
00387 }
00388 
00389 uint32_t Bag::getChunkOffset() const {
00390     if (compression_ == compression::Uncompressed)
00391         return file_.getOffset() - curr_chunk_data_pos_;
00392     else
00393         return file_.getCompressedBytesIn();
00394 }
00395 
00396 void Bag::startWritingChunk(Time time) {
00397     // Initialize chunk info
00398     curr_chunk_info_.pos        = file_.getOffset();
00399     curr_chunk_info_.start_time = time;
00400     curr_chunk_info_.end_time   = time;
00401 
00402     // Write the chunk header, with a place-holder for the data sizes (we'll fill in when the chunk is finished)
00403     writeChunkHeader(compression_, 0, 0);
00404 
00405     // Turn on compressed writing
00406     file_.setWriteMode(compression_);
00407     
00408     // Record where the data section of this chunk started
00409     curr_chunk_data_pos_ = file_.getOffset();
00410 
00411     chunk_open_ = true;
00412 }
00413 
00414 void Bag::stopWritingChunk() {
00415     // Add this chunk to the index
00416     chunks_.push_back(curr_chunk_info_);
00417     
00418     // Get the uncompressed and compressed sizes
00419     uint32_t uncompressed_size = getChunkOffset();
00420     file_.setWriteMode(compression::Uncompressed);
00421     uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_;
00422 
00423     // Rewrite the chunk header with the size of the chunk (remembering current offset)
00424     uint64_t end_of_chunk_pos = file_.getOffset();
00425 
00426     seek(curr_chunk_info_.pos);
00427     writeChunkHeader(compression_, compressed_size, uncompressed_size);
00428 
00429     // Write out the indexes and clear them
00430     seek(end_of_chunk_pos);
00431     writeIndexRecords();
00432     curr_chunk_connection_indexes_.clear();
00433 
00434     // Clear the connection counts
00435     curr_chunk_info_.connection_counts.clear();
00436     
00437     // Flag that we're starting a new chunk
00438     chunk_open_ = false;
00439 }
00440 
00441 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
00442     ChunkHeader chunk_header;
00443     switch (compression) {
00444     case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
00445     case compression::BZ2:          chunk_header.compression = COMPRESSION_BZ2;  break;
00446     //case compression::ZLIB:         chunk_header.compression = COMPRESSION_ZLIB; break;
00447     }
00448     chunk_header.compressed_size   = compressed_size;
00449     chunk_header.uncompressed_size = uncompressed_size;
00450 
00451     logDebug("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
00452               (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
00453 
00454     M_string header;
00455     header[OP_FIELD_NAME]          = toHeaderString(&OP_CHUNK);
00456     header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
00457     header[SIZE_FIELD_NAME]        = toHeaderString(&chunk_header.uncompressed_size);
00458     writeHeader(header);
00459 
00460     writeDataLength(chunk_header.compressed_size);
00461 }
00462 
00463 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
00464     ros::Header header;
00465     if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
00466         throw BagFormatException("Error reading CHUNK record");
00467         
00468     M_string& fields = *header.getValues();
00469 
00470     if (!isOp(fields, OP_CHUNK))
00471         throw BagFormatException("Expected CHUNK op not found");
00472 
00473     readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
00474     readField(fields, SIZE_FIELD_NAME,        true, &chunk_header.uncompressed_size);
00475 
00476     logDebug("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
00477 }
00478 
00479 // Index records
00480 
00481 void Bag::writeIndexRecords() {
00482     for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
00483         uint32_t                    connection_id = i->first;
00484         multiset<IndexEntry> const& index         = i->second;
00485 
00486         // Write the index record header
00487         uint32_t index_size = index.size();
00488         M_string header;
00489         header[OP_FIELD_NAME]         = toHeaderString(&OP_INDEX_DATA);
00490         header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
00491         header[VER_FIELD_NAME]        = toHeaderString(&INDEX_VERSION);
00492         header[COUNT_FIELD_NAME]      = toHeaderString(&index_size);
00493         writeHeader(header);
00494 
00495         writeDataLength(index_size * 12);
00496 
00497         logDebug("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
00498 
00499         // Write the index record data (pairs of timestamp and position in file)
00500         foreach(IndexEntry const& e, index) {
00501             write((char*) &e.time.sec,  4);
00502             write((char*) &e.time.nsec, 4);
00503             write((char*) &e.offset,    4);
00504 
00505             logDebug("  - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
00506         }
00507     }
00508 }
00509 
00510 void Bag::readTopicIndexRecord102() {
00511     ros::Header header;
00512     uint32_t data_size;
00513     if (!readHeader(header) || !readDataLength(data_size))
00514         throw BagFormatException("Error reading INDEX_DATA header");
00515     M_string& fields = *header.getValues();
00516 
00517     if (!isOp(fields, OP_INDEX_DATA))
00518         throw BagFormatException("Expected INDEX_DATA record");
00519 
00520     uint32_t index_version;
00521     string topic;
00522     uint32_t count = 0;
00523     readField(fields, VER_FIELD_NAME,   true, &index_version);
00524     readField(fields, TOPIC_FIELD_NAME, true, topic);
00525     readField(fields, COUNT_FIELD_NAME, true, &count);
00526 
00527     logDebug("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
00528 
00529     if (index_version != 0)
00530         throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00531 
00532     uint32_t connection_id;
00533     map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00534     if (topic_conn_id_iter == topic_connection_ids_.end()) {
00535         connection_id = connections_.size();
00536 
00537         logDebug("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
00538         ConnectionInfo* connection_info = new ConnectionInfo();
00539         connection_info->id       = connection_id;
00540         connection_info->topic    = topic;
00541         connections_[connection_id] = connection_info;
00542 
00543         topic_connection_ids_[topic] = connection_id;
00544     }
00545     else
00546         connection_id = topic_conn_id_iter->second;
00547 
00548     multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00549 
00550     for (uint32_t i = 0; i < count; i++) {
00551         IndexEntry index_entry;
00552         uint32_t sec;
00553         uint32_t nsec;
00554         read((char*) &sec,                   4);
00555         read((char*) &nsec,                  4);
00556         read((char*) &index_entry.chunk_pos, 8);   //<! store position of the message in the chunk_pos field as it's 64 bits
00557         index_entry.time = Time(sec, nsec);
00558         index_entry.offset = 0;
00559 
00560         logDebug("  - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
00561 
00562         if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00563         {
00564           logError("Index entry for topic %s contains invalid time.", topic.c_str());
00565         } else
00566         {
00567           connection_index.insert(connection_index.end(), index_entry);
00568         }
00569     }
00570 }
00571 
00572 void Bag::readConnectionIndexRecord200() {
00573     ros::Header header;
00574     uint32_t data_size;
00575     if (!readHeader(header) || !readDataLength(data_size))
00576         throw BagFormatException("Error reading INDEX_DATA header");
00577     M_string& fields = *header.getValues();
00578     
00579     if (!isOp(fields, OP_INDEX_DATA))
00580         throw BagFormatException("Expected INDEX_DATA record");
00581     
00582     uint32_t index_version;
00583     uint32_t connection_id;
00584     uint32_t count = 0;
00585     readField(fields, VER_FIELD_NAME,        true, &index_version);
00586     readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
00587     readField(fields, COUNT_FIELD_NAME,      true, &count);
00588 
00589     logDebug("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
00590 
00591     if (index_version != 1)
00592         throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00593 
00594     uint64_t chunk_pos = curr_chunk_info_.pos;
00595 
00596     multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00597 
00598     for (uint32_t i = 0; i < count; i++) {
00599         IndexEntry index_entry;
00600         index_entry.chunk_pos = chunk_pos;
00601         uint32_t sec;
00602         uint32_t nsec;
00603         read((char*) &sec,                4);
00604         read((char*) &nsec,               4);
00605         read((char*) &index_entry.offset, 4);
00606         index_entry.time = Time(sec, nsec);
00607 
00608         logDebug("  - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
00609 
00610         if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00611         {
00612           logError("Index entry for topic %s contains invalid time.  This message will not be loaded.", connections_[connection_id]->topic.c_str());
00613         } else
00614         {
00615           connection_index.insert(connection_index.end(), index_entry);
00616         }
00617     }
00618 }
00619 
00620 // Connection records
00621 
00622 void Bag::writeConnectionRecords() {
00623     for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
00624         ConnectionInfo const* connection_info = i->second;
00625         writeConnectionRecord(connection_info);
00626     }
00627 }
00628 
00629 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) {
00630     logDebug("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
00631               (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
00632 
00633     M_string header;
00634     header[OP_FIELD_NAME]         = toHeaderString(&OP_CONNECTION);
00635     header[TOPIC_FIELD_NAME]      = connection_info->topic;
00636     header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00637     writeHeader(header);
00638 
00639     writeHeader(*connection_info->header);
00640 }
00641 
00642 void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) {
00643     M_string header;
00644     header[OP_FIELD_NAME]         = toHeaderString(&OP_CONNECTION);
00645     header[TOPIC_FIELD_NAME]      = connection_info->topic;
00646     header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00647     appendHeaderToBuffer(buf, header);
00648 
00649     appendHeaderToBuffer(buf, *connection_info->header);
00650 }
00651 
00652 void Bag::readConnectionRecord() {
00653     ros::Header header;
00654     if (!readHeader(header))
00655         throw BagFormatException("Error reading CONNECTION header");
00656     M_string& fields = *header.getValues();
00657 
00658     if (!isOp(fields, OP_CONNECTION))
00659         throw BagFormatException("Expected CONNECTION op not found");
00660 
00661     uint32_t id;
00662     readField(fields, CONNECTION_FIELD_NAME, true, &id);
00663     string topic;
00664     readField(fields, TOPIC_FIELD_NAME,      true, topic);
00665 
00666     ros::Header connection_header;
00667     if (!readHeader(connection_header))
00668         throw BagFormatException("Error reading connection header");
00669 
00670     // If this is a new connection, update connections
00671     map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
00672     if (key == connections_.end()) {
00673         ConnectionInfo* connection_info = new ConnectionInfo();
00674         connection_info->id       = id;
00675         connection_info->topic    = topic;
00676         connection_info->header = shared_ptr<M_string>(new M_string);
00677         for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
00678             (*connection_info->header)[i->first] = i->second;
00679         connection_info->msg_def  = (*connection_info->header)["message_definition"];
00680         connection_info->datatype = (*connection_info->header)["type"];
00681         connection_info->md5sum   = (*connection_info->header)["md5sum"];
00682         connections_[id] = connection_info;
00683 
00684         logDebug("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
00685     }
00686 }
00687 
00688 void Bag::readMessageDefinitionRecord102() {
00689     ros::Header header;
00690     uint32_t data_size;
00691     if (!readHeader(header) || !readDataLength(data_size))
00692         throw BagFormatException("Error reading message definition header");
00693     M_string& fields = *header.getValues();
00694 
00695     if (!isOp(fields, OP_MSG_DEF))
00696         throw BagFormatException("Expected MSG_DEF op not found");
00697 
00698     string topic, md5sum, datatype, message_definition;
00699     readField(fields, TOPIC_FIELD_NAME,               true, topic);
00700     readField(fields, MD5_FIELD_NAME,   32,       32, true, md5sum);
00701     readField(fields, TYPE_FIELD_NAME,                true, datatype);
00702     readField(fields, DEF_FIELD_NAME,    0, UINT_MAX, true, message_definition);
00703 
00704     ConnectionInfo* connection_info;
00705 
00706     map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00707     if (topic_conn_id_iter == topic_connection_ids_.end()) {
00708         uint32_t id = connections_.size();
00709 
00710         logDebug("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00711         connection_info = new ConnectionInfo();
00712         connection_info->id       = id;
00713         connection_info->topic    = topic;
00714 
00715         connections_[id] = connection_info;
00716         topic_connection_ids_[topic] = id;
00717     }
00718     else
00719         connection_info = connections_[topic_conn_id_iter->second];
00720 
00721     connection_info->msg_def  = message_definition;
00722     connection_info->datatype = datatype;
00723     connection_info->md5sum   = md5sum;
00724     connection_info->header = boost::shared_ptr<ros::M_string>(new ros::M_string);
00725     (*connection_info->header)["type"]               = connection_info->datatype;
00726     (*connection_info->header)["md5sum"]             = connection_info->md5sum;
00727     (*connection_info->header)["message_definition"] = connection_info->msg_def;
00728 
00729     logDebug("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00730 }
00731 
00732 void Bag::decompressChunk(uint64_t chunk_pos) const {
00733     if (curr_chunk_info_.pos == chunk_pos) {
00734         current_buffer_ = &outgoing_chunk_buffer_;
00735         return;
00736     }
00737 
00738     current_buffer_ = &decompress_buffer_;
00739 
00740     if (decompressed_chunk_ == chunk_pos)
00741         return;
00742 
00743     // Seek to the start of the chunk
00744     seek(chunk_pos);
00745 
00746     // Read the chunk header
00747     ChunkHeader chunk_header;
00748     readChunkHeader(chunk_header);
00749 
00750     // Read and decompress the chunk.  These assume we are at the right place in the stream already
00751     if (chunk_header.compression == COMPRESSION_NONE)
00752         decompressRawChunk(chunk_header);
00753     else if (chunk_header.compression == COMPRESSION_BZ2)
00754         decompressBz2Chunk(chunk_header);
00755     else
00756         throw BagFormatException("Unknown compression: " + chunk_header.compression);
00757     
00758     decompressed_chunk_ = chunk_pos;
00759 }
00760 
00761 void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const {
00762     logDebug("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
00763 
00764     seek(offset);
00765 
00766     uint32_t data_size;
00767     uint8_t op;
00768     do {
00769         if (!readHeader(header) || !readDataLength(data_size))
00770             throw BagFormatException("Error reading header");
00771 
00772         readField(*header.getValues(), OP_FIELD_NAME, true, &op);
00773     }
00774     while (op == OP_MSG_DEF);
00775 
00776     if (op != OP_MSG_DATA)
00777         throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str());
00778 
00779     record_buffer_.setSize(data_size);
00780     file_.read((char*) record_buffer_.getData(), data_size);
00781 }
00782 
00783 // Reading this into a buffer isn't completely necessary, but we do it anyways for now
00784 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
00785     assert(chunk_header.compression == COMPRESSION_NONE);
00786     assert(chunk_header.compressed_size == chunk_header.uncompressed_size);
00787 
00788     logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00789 
00790     decompress_buffer_.setSize(chunk_header.compressed_size);
00791     file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size);
00792 
00793     // todo check read was successful
00794 }
00795 
00796 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
00797     assert(chunk_header.compression == COMPRESSION_BZ2);
00798 
00799     CompressionType compression = compression::BZ2;
00800 
00801     logDebug("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00802 
00803     chunk_buffer_.setSize(chunk_header.compressed_size);
00804     file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
00805 
00806     decompress_buffer_.setSize(chunk_header.uncompressed_size);
00807     file_.decompress(compression, decompress_buffer_.getData(), decompress_buffer_.getSize(), chunk_buffer_.getData(), chunk_buffer_.getSize());
00808 
00809     // todo check read was successful
00810 }
00811 
00812 ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) {
00813     ros::Header header;
00814     uint32_t data_size;
00815     uint32_t bytes_read;
00816     switch (version_)
00817     {
00818     case 200:
00819         decompressChunk(index_entry.chunk_pos);
00820         readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00821         return header;
00822     case 102:
00823         readMessageDataRecord102(index_entry.chunk_pos, header);
00824         return header;
00825     default:
00826         throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00827     }
00828 }
00829 
00830 // NOTE: this loads the header, which is unnecessary
00831 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
00832     ros::Header header;
00833     uint32_t data_size;
00834     uint32_t bytes_read;
00835     switch (version_)
00836     {
00837     case 200:
00838         decompressChunk(index_entry.chunk_pos);
00839         readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00840         return data_size;
00841     case 102:
00842         readMessageDataRecord102(index_entry.chunk_pos, header);
00843         return record_buffer_.getSize();
00844     default:
00845         throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00846     }
00847 }
00848 
00849 void Bag::writeChunkInfoRecords() {
00850     foreach(ChunkInfo const& chunk_info, chunks_) {
00851         // Write the chunk info header
00852         M_string header;
00853         uint32_t chunk_connection_count = chunk_info.connection_counts.size();
00854         header[OP_FIELD_NAME]         = toHeaderString(&OP_CHUNK_INFO);
00855         header[VER_FIELD_NAME]        = toHeaderString(&CHUNK_INFO_VERSION);
00856         header[CHUNK_POS_FIELD_NAME]  = toHeaderString(&chunk_info.pos);
00857         header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
00858         header[END_TIME_FIELD_NAME]   = toHeaderString(&chunk_info.end_time);
00859         header[COUNT_FIELD_NAME]      = toHeaderString(&chunk_connection_count);
00860 
00861         logDebug("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
00862                   (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
00863                   chunk_info.start_time.sec, chunk_info.start_time.nsec,
00864                   chunk_info.end_time.sec, chunk_info.end_time.nsec);
00865 
00866         writeHeader(header);
00867 
00868         writeDataLength(8 * chunk_connection_count);
00869 
00870         // Write the topic names and counts
00871         for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
00872             uint32_t connection_id = i->first;
00873             uint32_t count         = i->second;
00874 
00875             write((char*) &connection_id, 4);
00876             write((char*) &count, 4);
00877 
00878             logDebug("  - %d: %d", connection_id, count);
00879         }
00880     }
00881 }
00882 
00883 void Bag::readChunkInfoRecord() {
00884     // Read a CHUNK_INFO header
00885     ros::Header header;
00886     uint32_t data_size;
00887     if (!readHeader(header) || !readDataLength(data_size))
00888         throw BagFormatException("Error reading CHUNK_INFO record header");
00889     M_string& fields = *header.getValues();
00890     if (!isOp(fields, OP_CHUNK_INFO))
00891         throw BagFormatException("Expected CHUNK_INFO op not found");
00892 
00893     // Check that the chunk info version is current
00894     uint32_t chunk_info_version;
00895     readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
00896     if (chunk_info_version != CHUNK_INFO_VERSION)
00897         throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str());
00898 
00899     // Read the chunk position, timestamp, and topic count fields
00900     ChunkInfo chunk_info;
00901     readField(fields, CHUNK_POS_FIELD_NAME,  true, &chunk_info.pos);
00902     readField(fields, START_TIME_FIELD_NAME, true,  chunk_info.start_time);
00903     readField(fields, END_TIME_FIELD_NAME,   true,  chunk_info.end_time);
00904     uint32_t chunk_connection_count = 0;
00905     readField(fields, COUNT_FIELD_NAME,      true, &chunk_connection_count);
00906 
00907     logDebug("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
00908               (unsigned long long) chunk_info.pos, chunk_connection_count,
00909               chunk_info.start_time.sec, chunk_info.start_time.nsec,
00910               chunk_info.end_time.sec, chunk_info.end_time.nsec);
00911 
00912     // Read the topic count entries
00913     for (uint32_t i = 0; i < chunk_connection_count; i ++) {
00914         uint32_t connection_id, connection_count;
00915         read((char*) &connection_id,    4);
00916         read((char*) &connection_count, 4);
00917 
00918         logDebug("  %d: %d messages", connection_id, connection_count);
00919 
00920         chunk_info.connection_counts[connection_id] = connection_count;
00921     }
00922 
00923     chunks_.push_back(chunk_info);
00924 }
00925 
00926 // Record I/O
00927 
00928 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
00929     uint8_t op = 0xFF; // nonexistent op
00930     readField(fields, OP_FIELD_NAME, true, &op);
00931     return op == reqOp;
00932 }
00933 
00934 void Bag::writeHeader(M_string const& fields) {
00935     boost::shared_array<uint8_t> header_buffer;
00936     uint32_t header_len;
00937     ros::Header::write(fields, header_buffer, header_len);
00938     write((char*) &header_len, 4);
00939     write((char*) header_buffer.get(), header_len);
00940 }
00941 
00942 void Bag::writeDataLength(uint32_t data_len) {
00943     write((char*) &data_len, 4);
00944 }
00945 
00946 void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) {
00947     boost::shared_array<uint8_t> header_buffer;
00948     uint32_t header_len;
00949     ros::Header::write(fields, header_buffer, header_len);
00950 
00951     uint32_t offset = buf.getSize();
00952 
00953     buf.setSize(buf.getSize() + 4 + header_len);
00954 
00955     memcpy(buf.getData() + offset, &header_len, 4);
00956     offset += 4;
00957     memcpy(buf.getData() + offset, header_buffer.get(), header_len);
00958 }
00959 
00960 void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) {
00961     uint32_t offset = buf.getSize();
00962 
00963     buf.setSize(buf.getSize() + 4);
00964 
00965     memcpy(buf.getData() + offset, &data_len, 4);
00966 }
00967 
00969 void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const {
00970     assert(buffer.getSize() > 8);
00971 
00972     uint8_t* start = (uint8_t*) buffer.getData() + offset;
00973 
00974     uint8_t* ptr = start;
00975 
00976     // Read the header length
00977     uint32_t header_len;
00978     memcpy(&header_len, ptr, 4);
00979     ptr += 4;
00980 
00981     // Parse the header
00982     string error_msg;
00983     bool parsed = header.parse(ptr, header_len, error_msg);
00984     if (!parsed)
00985         throw BagFormatException("Error parsing header");
00986     ptr += header_len;
00987 
00988     // Read the data size
00989     memcpy(&data_size, ptr, 4);
00990     ptr += 4;
00991 
00992     bytes_read = ptr - start;
00993 }
00994 
00995 void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) const {
00996     total_bytes_read = 0;
00997     uint8_t op = 0xFF;
00998     do {
00999         logDebug("reading header from buffer: offset=%d", offset);
01000         uint32_t bytes_read;
01001         readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
01002 
01003         offset += bytes_read;
01004         total_bytes_read += bytes_read;
01005         
01006         readField(*header.getValues(), OP_FIELD_NAME, true, &op);
01007     }
01008     while (op == OP_MSG_DEF || op == OP_CONNECTION);
01009 
01010     if (op != OP_MSG_DATA)
01011         throw BagFormatException("Expected MSG_DATA op not found");
01012 }
01013 
01014 bool Bag::readHeader(ros::Header& header) const {
01015     // Read the header length
01016     uint32_t header_len;
01017     read((char*) &header_len, 4);
01018 
01019     // Read the header
01020     header_buffer_.setSize(header_len);
01021     read((char*) header_buffer_.getData(), header_len);
01022 
01023     // Parse the header
01024     string error_msg;
01025     bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
01026     if (!parsed)
01027         return false;
01028 
01029     return true;
01030 }
01031 
01032 bool Bag::readDataLength(uint32_t& data_size) const {
01033     read((char*) &data_size, 4);
01034     return true;
01035 }
01036 
01037 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
01038     M_string::const_iterator fitr = fields.find(field);
01039     if (fitr == fields.end()) {
01040         if (required)
01041             throw BagFormatException("Required '" + field + "' field missing");
01042     }
01043     else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
01044         throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
01045 
01046     return fitr;
01047 }
01048 
01049 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
01050     return readField(fields, field_name, 1, UINT_MAX, required, data);
01051 }
01052 
01053 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
01054     M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
01055     if (fitr == fields.end())
01056         return false;
01057 
01058     data = fitr->second;
01059     return true;
01060 }
01061 
01062 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
01063     uint64_t packed_time;
01064     if (!readField(fields, field_name, required, &packed_time))
01065         return false;
01066 
01067     uint64_t bitmask = (1LL << 33) - 1;
01068     data.sec  = (uint32_t) (packed_time & bitmask);
01069     data.nsec = (uint32_t) (packed_time >> 32);
01070 
01071     return true;
01072 }
01073 
01074 std::string Bag::toHeaderString(Time const* field) const {
01075     uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
01076     return toHeaderString(&packed_time);
01077 }
01078 
01079 
01080 // Low-level I/O
01081 
01082 void Bag::write(string const& s)                  { write(s.c_str(), s.length()); }
01083 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n);    }
01084 
01085 void Bag::read(char* b, std::streamsize n) const  { file_.read(b, n);             }
01086 void Bag::seek(uint64_t pos, int origin) const    { file_.seek(pos, origin);      }
01087 
01088 } // namespace rosbag


rosbag_storage
Author(s):
autogenerated on Fri Aug 28 2015 12:33:40