bag.cpp
Go to the documentation of this file.
00001 // Copyright (c) 2009, Willow Garage, Inc.
00002 // All rights reserved.
00003 // 
00004 // Redistribution and use in source and binary forms, with or without
00005 // modification, are permitted provided that the following conditions are met:
00006 // 
00007 //     * Redistributions of source code must retain the above copyright
00008 //       notice, this list of conditions and the following disclaimer.
00009 //     * Redistributions in binary form must reproduce the above copyright
00010 //       notice, this list of conditions and the following disclaimer in the
00011 //       documentation and/or other materials provided with the distribution.
00012 //     * Neither the name of Willow Garage, Inc. nor the names of its
00013 //       contributors may be used to endorse or promote products derived from
00014 //       this software without specific prior written permission.
00015 // 
00016 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00019 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00020 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00021 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00022 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00023 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00024 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00025 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00026 // POSSIBILITY OF SUCH DAMAGE.
00027 
00028 #include "rosbag/bag.h"
00029 #include "rosbag/message_instance.h"
00030 #include "rosbag/query.h"
00031 #include "rosbag/view.h"
00032 
00033 #if defined(_MSC_VER)
00034   #include <stdint.h> // only on v2010 and later -> is this enough for msvc and linux?
00035 #else
00036   #include <inttypes.h>
00037 #endif
00038 #include <signal.h>
00039 
00040 #include <iomanip>
00041 
00042 #include <boost/foreach.hpp>
00043 
00044 #define foreach BOOST_FOREACH
00045 
00046 using std::map;
00047 using std::priority_queue;
00048 using std::string;
00049 using std::vector;
00050 using std::multiset;
00051 using boost::format;
00052 using boost::shared_ptr;
00053 using ros::M_string;
00054 using ros::Time;
00055 
00056 namespace rosbag {
00057 
00058 Bag::Bag() :
00059     compression_(compression::Uncompressed),
00060     chunk_threshold_(768 * 1024),  // 768KB chunks
00061     bag_revision_(0),
00062     file_size_(0),
00063     file_header_pos_(0),
00064     index_data_pos_(0),
00065     connection_count_(0),
00066     chunk_count_(0),
00067     chunk_open_(false),
00068     curr_chunk_data_pos_(0),
00069     decompressed_chunk_(0)
00070 {
00071 }
00072 
00073 Bag::Bag(string const& filename, uint32_t mode) :
00074     compression_(compression::Uncompressed),
00075     chunk_threshold_(768 * 1024),  // 768KB chunks
00076     bag_revision_(0),
00077     file_size_(0),
00078     file_header_pos_(0),
00079     index_data_pos_(0),
00080     connection_count_(0),
00081     chunk_count_(0),
00082     chunk_open_(false),
00083     curr_chunk_data_pos_(0),
00084     decompressed_chunk_(0)
00085 {
00086     open(filename, mode);
00087 }
00088 
00089 Bag::~Bag() {
00090     close();
00091 }
00092 
00093 void Bag::open(string const& filename, uint32_t mode) {
00094     mode_ = (BagMode) mode;
00095 
00096     if (mode_ & bagmode::Append)
00097         openAppend(filename);
00098     else if (mode_ & bagmode::Write)
00099         openWrite(filename);
00100     else if (mode_ & bagmode::Read)
00101         openRead(filename);
00102     else
00103         throw BagException((format("Unknown mode: %1%") % (int) mode).str());
00104 
00105     // Determine file size
00106     uint64_t offset = file_.getOffset();
00107     seek(0, std::ios::end);
00108     file_size_ = file_.getOffset();
00109     seek(offset);
00110 }
00111 
00112 void Bag::openRead(string const& filename) {
00113     file_.openRead(filename);
00114 
00115     readVersion();
00116 
00117     switch (version_) {
00118     case 102: startReadingVersion102(); break;
00119     case 200: startReadingVersion200(); break;
00120     default:
00121         throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str());
00122     }
00123 }
00124 
00125 void Bag::openWrite(string const& filename) {
00126     file_.openWrite(filename);
00127 
00128     startWriting();
00129 }
00130 
00131 void Bag::openAppend(string const& filename) {
00132     file_.openReadWrite(filename);
00133 
00134     readVersion();
00135 
00136     if (version_ != 200)
00137         throw BagException((format("Bag file version %1%.%2% is unsupported for appending") % getMajorVersion() % getMinorVersion()).str());
00138 
00139     startReadingVersion200();
00140 
00141     // Truncate the file to chop off the index
00142     file_.truncate(index_data_pos_);
00143     index_data_pos_ = 0;
00144 
00145     // Rewrite the file header, clearing the index position (so we know if the index is invalid)
00146     seek(file_header_pos_);
00147     writeFileHeaderRecord();
00148 
00149     // Seek to the end of the file
00150     seek(0, std::ios::end);
00151 }
00152 
00153 void Bag::close() {
00154     if (!file_.isOpen())
00155         return;
00156 
00157     if (mode_ & bagmode::Write || mode_ & bagmode::Append)
00158         closeWrite();
00159     
00160     file_.close();
00161 
00162     topic_connection_ids_.clear();
00163     header_connection_ids_.clear();
00164     for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
00165         delete i->second;
00166     connections_.clear();
00167     chunks_.clear();
00168     connection_indexes_.clear();
00169     curr_chunk_connection_indexes_.clear();
00170 }
00171 
00172 void Bag::closeWrite() {
00173     stopWriting();
00174 }
00175 
00176 string   Bag::getFileName() const { return file_.getFileName(); }
00177 BagMode  Bag::getMode()     const { return mode_;               }
00178 uint64_t Bag::getSize()     const { return file_size_;          }
00179 
00180 uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; }
00181 
00182 void Bag::setChunkThreshold(uint32_t chunk_threshold) {
00183     if (file_.isOpen() && chunk_open_)
00184         stopWritingChunk();
00185 
00186     chunk_threshold_ = chunk_threshold;
00187 }
00188 
00189 CompressionType Bag::getCompression() const { return compression_; }
00190 
00191 void Bag::setCompression(CompressionType compression) {
00192     if (file_.isOpen() && chunk_open_)
00193         stopWritingChunk();
00194 
00195     compression_ = compression;
00196 }
00197 
00198 // Version
00199 
00200 void Bag::writeVersion() {
00201     string version = string("#ROSBAG V") + VERSION + string("\n");
00202 
00203     ROS_DEBUG("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
00204 
00205     version_ = 200;
00206 
00207     write(version);
00208 }
00209 
00210 void Bag::readVersion() {
00211     string version_line = file_.getline();
00212 
00213     file_header_pos_ = file_.getOffset();
00214 
00215     char logtypename[100];
00216     int version_major, version_minor;
00217 #if defined(_MSC_VER)
00218     if (sscanf_s(version_line.c_str(), "#ROS%s V%d.%d", logtypename, sizeof(logtypename), &version_major, &version_minor) != 3)
00219 #else
00220     if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3)
00221 #endif
00222         throw BagIOException("Error reading version line");
00223 
00224     version_ = version_major * 100 + version_minor;
00225 
00226     ROS_DEBUG("Read VERSION: version=%d", version_);
00227 }
00228 
00229 uint32_t Bag::getMajorVersion() const { return version_ / 100; }
00230 uint32_t Bag::getMinorVersion() const { return version_ % 100; }
00231 
00232 //
00233 
00234 void Bag::startWriting() {
00235     writeVersion();
00236     file_header_pos_ = file_.getOffset();
00237     writeFileHeaderRecord();
00238 }
00239 
00240 void Bag::stopWriting() {
00241     if (chunk_open_)
00242         stopWritingChunk();
00243 
00244     seek(0, std::ios::end);
00245 
00246     index_data_pos_ = file_.getOffset();
00247     writeConnectionRecords();
00248     writeChunkInfoRecords();
00249 
00250     seek(file_header_pos_);
00251     writeFileHeaderRecord();
00252 }
00253 
00254 void Bag::startReadingVersion200() {
00255     // Read the file header record, which points to the end of the chunks
00256     readFileHeaderRecord();
00257 
00258     // Seek to the end of the chunks
00259     seek(index_data_pos_);
00260 
00261     // Read the connection records (one for each connection)
00262     for (uint32_t i = 0; i < connection_count_; i++)
00263         readConnectionRecord();
00264 
00265     // Read the chunk info records
00266     for (uint32_t i = 0; i < chunk_count_; i++)
00267         readChunkInfoRecord();
00268 
00269     // Read the connection indexes for each chunk
00270     foreach(ChunkInfo const& chunk_info, chunks_) {
00271         curr_chunk_info_ = chunk_info;
00272 
00273         seek(curr_chunk_info_.pos);
00274 
00275         // Skip over the chunk data
00276         ChunkHeader chunk_header;
00277         readChunkHeader(chunk_header);
00278         seek(chunk_header.compressed_size, std::ios::cur);
00279 
00280         // Read the index records after the chunk
00281         for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
00282             readConnectionIndexRecord200();
00283     }
00284 
00285     // At this point we don't have a curr_chunk_info anymore so we reset it
00286     curr_chunk_info_ = ChunkInfo();
00287 }
00288 
00289 void Bag::startReadingVersion102() {
00290     try
00291     {
00292         // Read the file header record, which points to the start of the topic indexes
00293         readFileHeaderRecord();
00294     }
00295     catch (BagFormatException ex) {
00296         throw BagUnindexedException();
00297     }
00298 
00299     // Get the length of the file
00300     seek(0, std::ios::end);
00301     uint64_t filelength = file_.getOffset();
00302 
00303     // Seek to the beginning of the topic index records
00304     seek(index_data_pos_);
00305 
00306     // Read the topic index records, which point to the offsets of each message in the file
00307     while (file_.getOffset() < filelength)
00308         readTopicIndexRecord102();
00309 
00310     // Read the message definition records (which are the first entry in the topic indexes)
00311     for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
00312         multiset<IndexEntry> const& index       = i->second;
00313         IndexEntry const&           first_entry = *index.begin();
00314 
00315         ROS_DEBUG("Reading message definition for connection %d at %llu", i->first, (unsigned long long) first_entry.chunk_pos);
00316 
00317         seek(first_entry.chunk_pos);
00318 
00319         readMessageDefinitionRecord102();
00320     }
00321 }
00322 
00323 // File header record
00324 
00325 void Bag::writeFileHeaderRecord() {
00326     connection_count_ = connections_.size();
00327     chunk_count_      = chunks_.size();
00328 
00329     ROS_DEBUG("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
00330               (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00331     
00332     // Write file header record
00333     M_string header;
00334     header[OP_FIELD_NAME]               = toHeaderString(&OP_FILE_HEADER);
00335     header[INDEX_POS_FIELD_NAME]        = toHeaderString(&index_data_pos_);
00336     header[CONNECTION_COUNT_FIELD_NAME] = toHeaderString(&connection_count_);
00337     header[CHUNK_COUNT_FIELD_NAME]      = toHeaderString(&chunk_count_);
00338 
00339     boost::shared_array<uint8_t> header_buffer;
00340     uint32_t header_len;
00341     ros::Header::write(header, header_buffer, header_len);
00342     uint32_t data_len = 0;
00343     if (header_len < FILE_HEADER_LENGTH)
00344         data_len = FILE_HEADER_LENGTH - header_len;
00345     write((char*) &header_len, 4);
00346     write((char*) header_buffer.get(), header_len);
00347     write((char*) &data_len, 4);
00348     
00349     // Pad the file header record out
00350     if (data_len > 0) {
00351         string padding;
00352         padding.resize(data_len, ' ');
00353         write(padding);
00354     }
00355 }
00356 
00357 void Bag::readFileHeaderRecord() {
00358     ros::Header header;
00359     uint32_t data_size;
00360     if (!readHeader(header) || !readDataLength(data_size))
00361         throw BagFormatException("Error reading FILE_HEADER record");
00362 
00363     M_string& fields = *header.getValues();
00364 
00365     if (!isOp(fields, OP_FILE_HEADER))
00366         throw BagFormatException("Expected FILE_HEADER op not found");
00367 
00368     // Read index position
00369     readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
00370 
00371     if (index_data_pos_ == 0)
00372         throw BagUnindexedException();
00373 
00374     // Read topic and chunks count
00375     if (version_ >= 200) {
00376         readField(fields, CONNECTION_COUNT_FIELD_NAME, true, &connection_count_);
00377         readField(fields, CHUNK_COUNT_FIELD_NAME,      true, &chunk_count_);
00378     }
00379 
00380     ROS_DEBUG("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
00381               (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
00382 
00383     // Skip the data section (just padding)
00384     seek(data_size, std::ios::cur);
00385 }
00386 
00387 uint32_t Bag::getChunkOffset() const {
00388     if (compression_ == compression::Uncompressed)
00389         return file_.getOffset() - curr_chunk_data_pos_;
00390     else
00391         return file_.getCompressedBytesIn();
00392 }
00393 
00394 void Bag::startWritingChunk(Time time) {
00395     // Initialize chunk info
00396     curr_chunk_info_.pos        = file_.getOffset();
00397     curr_chunk_info_.start_time = time;
00398     curr_chunk_info_.end_time   = time;
00399 
00400     // Write the chunk header, with a place-holder for the data sizes (we'll fill in when the chunk is finished)
00401     writeChunkHeader(compression_, 0, 0);
00402 
00403     // Turn on compressed writing
00404     file_.setWriteMode(compression_);
00405     
00406     // Record where the data section of this chunk started
00407     curr_chunk_data_pos_ = file_.getOffset();
00408 
00409     chunk_open_ = true;
00410 }
00411 
00412 void Bag::stopWritingChunk() {
00413     // Add this chunk to the index
00414     chunks_.push_back(curr_chunk_info_);
00415     
00416     // Get the uncompressed and compressed sizes
00417     uint32_t uncompressed_size = getChunkOffset();
00418     file_.setWriteMode(compression::Uncompressed);
00419     uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_;
00420 
00421     // Rewrite the chunk header with the size of the chunk (remembering current offset)
00422     uint64_t end_of_chunk_pos = file_.getOffset();
00423 
00424     seek(curr_chunk_info_.pos);
00425     writeChunkHeader(compression_, compressed_size, uncompressed_size);
00426 
00427     // Write out the indexes and clear them
00428     seek(end_of_chunk_pos);
00429     writeIndexRecords();
00430     curr_chunk_connection_indexes_.clear();
00431 
00432     // Clear the connection counts
00433     curr_chunk_info_.connection_counts.clear();
00434     
00435     // Flag that we're starting a new chunk
00436     chunk_open_ = false;
00437 }
00438 
00439 void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size) {
00440     ChunkHeader chunk_header;
00441     switch (compression) {
00442     case compression::Uncompressed: chunk_header.compression = COMPRESSION_NONE; break;
00443     case compression::BZ2:          chunk_header.compression = COMPRESSION_BZ2;  break;
00444     //case compression::ZLIB:         chunk_header.compression = COMPRESSION_ZLIB; break;
00445     }
00446     chunk_header.compressed_size   = compressed_size;
00447     chunk_header.uncompressed_size = uncompressed_size;
00448 
00449     ROS_DEBUG("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
00450               (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
00451 
00452     M_string header;
00453     header[OP_FIELD_NAME]          = toHeaderString(&OP_CHUNK);
00454     header[COMPRESSION_FIELD_NAME] = chunk_header.compression;
00455     header[SIZE_FIELD_NAME]        = toHeaderString(&chunk_header.uncompressed_size);
00456     writeHeader(header);
00457 
00458     writeDataLength(chunk_header.compressed_size);
00459 }
00460 
00461 void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
00462     ros::Header header;
00463     if (!readHeader(header) || !readDataLength(chunk_header.compressed_size))
00464         throw BagFormatException("Error reading CHUNK record");
00465         
00466     M_string& fields = *header.getValues();
00467 
00468     if (!isOp(fields, OP_CHUNK))
00469         throw BagFormatException("Expected CHUNK op not found");
00470 
00471     readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
00472     readField(fields, SIZE_FIELD_NAME,        true, &chunk_header.uncompressed_size);
00473 
00474     ROS_DEBUG("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
00475 }
00476 
00477 // Index records
00478 
00479 void Bag::writeIndexRecords() {
00480     for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
00481         uint32_t                    connection_id = i->first;
00482         multiset<IndexEntry> const& index         = i->second;
00483 
00484         // Write the index record header
00485         uint32_t index_size = index.size();
00486         M_string header;
00487         header[OP_FIELD_NAME]         = toHeaderString(&OP_INDEX_DATA);
00488         header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
00489         header[VER_FIELD_NAME]        = toHeaderString(&INDEX_VERSION);
00490         header[COUNT_FIELD_NAME]      = toHeaderString(&index_size);
00491         writeHeader(header);
00492 
00493         writeDataLength(index_size * 12);
00494 
00495         ROS_DEBUG("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
00496 
00497         // Write the index record data (pairs of timestamp and position in file)
00498         foreach(IndexEntry const& e, index) {
00499             write((char*) &e.time.sec,  4);
00500             write((char*) &e.time.nsec, 4);
00501             write((char*) &e.offset,    4);
00502 
00503             ROS_DEBUG("  - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
00504         }
00505     }
00506 }
00507 
00508 void Bag::readTopicIndexRecord102() {
00509     ros::Header header;
00510     uint32_t data_size;
00511     if (!readHeader(header) || !readDataLength(data_size))
00512         throw BagFormatException("Error reading INDEX_DATA header");
00513     M_string& fields = *header.getValues();
00514 
00515     if (!isOp(fields, OP_INDEX_DATA))
00516         throw BagFormatException("Expected INDEX_DATA record");
00517 
00518     uint32_t index_version;
00519     string topic;
00520     uint32_t count = 0;
00521     readField(fields, VER_FIELD_NAME,   true, &index_version);
00522     readField(fields, TOPIC_FIELD_NAME, true, topic);
00523     readField(fields, COUNT_FIELD_NAME, true, &count);
00524 
00525     ROS_DEBUG("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
00526 
00527     if (index_version != 0)
00528         throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00529 
00530     uint32_t connection_id;
00531     map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00532     if (topic_conn_id_iter == topic_connection_ids_.end()) {
00533         connection_id = connections_.size();
00534 
00535         ROS_DEBUG("Creating connection: id=%d topic=%s", connection_id, topic.c_str());
00536         ConnectionInfo* connection_info = new ConnectionInfo();
00537         connection_info->id       = connection_id;
00538         connection_info->topic    = topic;
00539         connections_[connection_id] = connection_info;
00540 
00541         topic_connection_ids_[topic] = connection_id;
00542     }
00543     else
00544         connection_id = topic_conn_id_iter->second;
00545 
00546     multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00547 
00548     for (uint32_t i = 0; i < count; i++) {
00549         IndexEntry index_entry;
00550         uint32_t sec;
00551         uint32_t nsec;
00552         read((char*) &sec,                   4);
00553         read((char*) &nsec,                  4);
00554         read((char*) &index_entry.chunk_pos, 8);   //<! store position of the message in the chunk_pos field as it's 64 bits
00555         index_entry.time = Time(sec, nsec);
00556         index_entry.offset = 0;
00557 
00558         ROS_DEBUG("  - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
00559 
00560         if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00561         {
00562           ROS_ERROR("Index entry for topic %s contains invalid time.", topic.c_str());
00563         } else
00564         {
00565           connection_index.insert(connection_index.end(), index_entry);
00566         }
00567     }
00568 }
00569 
00570 void Bag::readConnectionIndexRecord200() {
00571     ros::Header header;
00572     uint32_t data_size;
00573     if (!readHeader(header) || !readDataLength(data_size))
00574         throw BagFormatException("Error reading INDEX_DATA header");
00575     M_string& fields = *header.getValues();
00576     
00577     if (!isOp(fields, OP_INDEX_DATA))
00578         throw BagFormatException("Expected INDEX_DATA record");
00579     
00580     uint32_t index_version;
00581     uint32_t connection_id;
00582     uint32_t count = 0;
00583     readField(fields, VER_FIELD_NAME,        true, &index_version);
00584     readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
00585     readField(fields, COUNT_FIELD_NAME,      true, &count);
00586 
00587     ROS_DEBUG("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
00588 
00589     if (index_version != 1)
00590         throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
00591 
00592     uint64_t chunk_pos = curr_chunk_info_.pos;
00593 
00594     multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
00595 
00596     for (uint32_t i = 0; i < count; i++) {
00597         IndexEntry index_entry;
00598         index_entry.chunk_pos = chunk_pos;
00599         uint32_t sec;
00600         uint32_t nsec;
00601         read((char*) &sec,                4);
00602         read((char*) &nsec,               4);
00603         read((char*) &index_entry.offset, 4);
00604         index_entry.time = Time(sec, nsec);
00605 
00606         ROS_DEBUG("  - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
00607 
00608         if (index_entry.time < ros::TIME_MIN || index_entry.time > ros::TIME_MAX)
00609         {
00610           ROS_ERROR("Index entry for topic %s contains invalid time.  This message will not be loaded.", connections_[connection_id]->topic.c_str());
00611         } else
00612         {
00613           connection_index.insert(connection_index.end(), index_entry);
00614         }
00615     }
00616 }
00617 
00618 // Connection records
00619 
00620 void Bag::writeConnectionRecords() {
00621     for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
00622         ConnectionInfo const* connection_info = i->second;
00623         writeConnectionRecord(connection_info);
00624     }
00625 }
00626 
00627 void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) {
00628     ROS_DEBUG("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
00629               (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
00630 
00631     M_string header;
00632     header[OP_FIELD_NAME]         = toHeaderString(&OP_CONNECTION);
00633     header[TOPIC_FIELD_NAME]      = connection_info->topic;
00634     header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00635     writeHeader(header);
00636 
00637     writeHeader(*connection_info->header);
00638 }
00639 
00640 void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) {
00641     M_string header;
00642     header[OP_FIELD_NAME]         = toHeaderString(&OP_CONNECTION);
00643     header[TOPIC_FIELD_NAME]      = connection_info->topic;
00644     header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id);
00645     appendHeaderToBuffer(buf, header);
00646 
00647     appendHeaderToBuffer(buf, *connection_info->header);
00648 }
00649 
00650 void Bag::readConnectionRecord() {
00651     ros::Header header;
00652     if (!readHeader(header))
00653         throw BagFormatException("Error reading CONNECTION header");
00654     M_string& fields = *header.getValues();
00655 
00656     if (!isOp(fields, OP_CONNECTION))
00657         throw BagFormatException("Expected CONNECTION op not found");
00658 
00659     uint32_t id;
00660     readField(fields, CONNECTION_FIELD_NAME, true, &id);
00661     string topic;
00662     readField(fields, TOPIC_FIELD_NAME,      true, topic);
00663 
00664     ros::Header connection_header;
00665     if (!readHeader(connection_header))
00666         throw BagFormatException("Error reading connection header");
00667 
00668     // If this is a new connection, update connections
00669     map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
00670     if (key == connections_.end()) {
00671         ConnectionInfo* connection_info = new ConnectionInfo();
00672         connection_info->id       = id;
00673         connection_info->topic    = topic;
00674         connection_info->header = shared_ptr<M_string>(new M_string);
00675         for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
00676             (*connection_info->header)[i->first] = i->second;
00677         connection_info->msg_def  = (*connection_info->header)["message_definition"];
00678         connection_info->datatype = (*connection_info->header)["type"];
00679         connection_info->md5sum   = (*connection_info->header)["md5sum"];
00680         connections_[id] = connection_info;
00681 
00682         ROS_DEBUG("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
00683     }
00684 }
00685 
00686 void Bag::readMessageDefinitionRecord102() {
00687     ros::Header header;
00688     uint32_t data_size;
00689     if (!readHeader(header) || !readDataLength(data_size))
00690         throw BagFormatException("Error reading message definition header");
00691     M_string& fields = *header.getValues();
00692 
00693     if (!isOp(fields, OP_MSG_DEF))
00694         throw BagFormatException("Expected MSG_DEF op not found");
00695 
00696     string topic, md5sum, datatype, message_definition;
00697     readField(fields, TOPIC_FIELD_NAME,               true, topic);
00698     readField(fields, MD5_FIELD_NAME,   32,       32, true, md5sum);
00699     readField(fields, TYPE_FIELD_NAME,                true, datatype);
00700     readField(fields, DEF_FIELD_NAME,    0, UINT_MAX, true, message_definition);
00701 
00702     ConnectionInfo* connection_info;
00703 
00704     map<string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00705     if (topic_conn_id_iter == topic_connection_ids_.end()) {
00706         uint32_t id = connections_.size();
00707 
00708         ROS_DEBUG("Creating connection: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00709         connection_info = new ConnectionInfo();
00710         connection_info->id       = id;
00711         connection_info->topic    = topic;
00712 
00713         connections_[id] = connection_info;
00714         topic_connection_ids_[topic] = id;
00715     }
00716     else
00717         connection_info = connections_[topic_conn_id_iter->second];
00718 
00719     connection_info->msg_def  = message_definition;
00720     connection_info->datatype = datatype;
00721     connection_info->md5sum   = md5sum;
00722     connection_info->header = boost::shared_ptr<ros::M_string>(new ros::M_string);
00723     (*connection_info->header)["type"]               = connection_info->datatype;
00724     (*connection_info->header)["md5sum"]             = connection_info->md5sum;
00725     (*connection_info->header)["message_definition"] = connection_info->msg_def;
00726 
00727     ROS_DEBUG("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
00728 }
00729 
00730 void Bag::decompressChunk(uint64_t chunk_pos) const {
00731     if (curr_chunk_info_.pos == chunk_pos) {
00732         current_buffer_ = &outgoing_chunk_buffer_;
00733         return;
00734     }
00735 
00736     current_buffer_ = &decompress_buffer_;
00737 
00738     if (decompressed_chunk_ == chunk_pos)
00739         return;
00740 
00741     // Seek to the start of the chunk
00742     seek(chunk_pos);
00743 
00744     // Read the chunk header
00745     ChunkHeader chunk_header;
00746     readChunkHeader(chunk_header);
00747 
00748     // Read and decompress the chunk.  These assume we are at the right place in the stream already
00749     if (chunk_header.compression == COMPRESSION_NONE)
00750         decompressRawChunk(chunk_header);
00751     else if (chunk_header.compression == COMPRESSION_BZ2)
00752         decompressBz2Chunk(chunk_header);
00753     else
00754         throw BagFormatException("Unknown compression: " + chunk_header.compression);
00755     
00756     decompressed_chunk_ = chunk_pos;
00757 }
00758 
00759 void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const {
00760     ROS_DEBUG("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
00761 
00762     seek(offset);
00763 
00764     uint32_t data_size;
00765     uint8_t op;
00766     do {
00767         if (!readHeader(header) || !readDataLength(data_size))
00768             throw BagFormatException("Error reading header");
00769 
00770         readField(*header.getValues(), OP_FIELD_NAME, true, &op);
00771     }
00772     while (op == OP_MSG_DEF);
00773 
00774     if (op != OP_MSG_DATA)
00775         throw BagFormatException((format("Expected MSG_DATA op, got %d") % op).str());
00776 
00777     record_buffer_.setSize(data_size);
00778     file_.read((char*) record_buffer_.getData(), data_size);
00779 }
00780 
00781 // Reading this into a buffer isn't completely necessary, but we do it anyways for now
00782 void Bag::decompressRawChunk(ChunkHeader const& chunk_header) const {
00783     assert(chunk_header.compression == COMPRESSION_NONE);
00784     assert(chunk_header.compressed_size == chunk_header.uncompressed_size);
00785 
00786     ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00787 
00788     decompress_buffer_.setSize(chunk_header.compressed_size);
00789     file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size);
00790 
00791     // todo check read was successful
00792 }
00793 
00794 void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) const {
00795     assert(chunk_header.compression == COMPRESSION_BZ2);
00796 
00797     CompressionType compression = compression::BZ2;
00798 
00799     ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
00800 
00801     chunk_buffer_.setSize(chunk_header.compressed_size);
00802     file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
00803 
00804     decompress_buffer_.setSize(chunk_header.uncompressed_size);
00805     file_.decompress(compression, decompress_buffer_.getData(), decompress_buffer_.getSize(), chunk_buffer_.getData(), chunk_buffer_.getSize());
00806 
00807     // todo check read was successful
00808 }
00809 
00810 ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) {
00811     ros::Header header;
00812     uint32_t data_size;
00813     uint32_t bytes_read;
00814     switch (version_)
00815     {
00816     case 200:
00817         decompressChunk(index_entry.chunk_pos);
00818         readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00819         return header;
00820     case 102:
00821         readMessageDataRecord102(index_entry.chunk_pos, header);
00822         return header;
00823     default:
00824         throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00825     }
00826 }
00827 
00828 // NOTE: this loads the header, which is unnecessary
00829 uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
00830     ros::Header header;
00831     uint32_t data_size;
00832     uint32_t bytes_read;
00833     switch (version_)
00834     {
00835     case 200:
00836         decompressChunk(index_entry.chunk_pos);
00837         readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00838         return data_size;
00839     case 102:
00840         readMessageDataRecord102(index_entry.chunk_pos, header);
00841         return record_buffer_.getSize();
00842     default:
00843         throw BagFormatException((format("Unhandled version: %1%") % version_).str());
00844     }
00845 }
00846 
00847 void Bag::writeChunkInfoRecords() {
00848     foreach(ChunkInfo const& chunk_info, chunks_) {
00849         // Write the chunk info header
00850         M_string header;
00851         uint32_t chunk_connection_count = chunk_info.connection_counts.size();
00852         header[OP_FIELD_NAME]         = toHeaderString(&OP_CHUNK_INFO);
00853         header[VER_FIELD_NAME]        = toHeaderString(&CHUNK_INFO_VERSION);
00854         header[CHUNK_POS_FIELD_NAME]  = toHeaderString(&chunk_info.pos);
00855         header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
00856         header[END_TIME_FIELD_NAME]   = toHeaderString(&chunk_info.end_time);
00857         header[COUNT_FIELD_NAME]      = toHeaderString(&chunk_connection_count);
00858 
00859         ROS_DEBUG("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
00860                   (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
00861                   chunk_info.start_time.sec, chunk_info.start_time.nsec,
00862                   chunk_info.end_time.sec, chunk_info.end_time.nsec);
00863 
00864         writeHeader(header);
00865 
00866         writeDataLength(8 * chunk_connection_count);
00867 
00868         // Write the topic names and counts
00869         for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
00870             uint32_t connection_id = i->first;
00871             uint32_t count         = i->second;
00872 
00873             write((char*) &connection_id, 4);
00874             write((char*) &count, 4);
00875 
00876             ROS_DEBUG("  - %d: %d", connection_id, count);
00877         }
00878     }
00879 }
00880 
00881 void Bag::readChunkInfoRecord() {
00882     // Read a CHUNK_INFO header
00883     ros::Header header;
00884     uint32_t data_size;
00885     if (!readHeader(header) || !readDataLength(data_size))
00886         throw BagFormatException("Error reading CHUNK_INFO record header");
00887     M_string& fields = *header.getValues();
00888     if (!isOp(fields, OP_CHUNK_INFO))
00889         throw BagFormatException("Expected CHUNK_INFO op not found");
00890 
00891     // Check that the chunk info version is current
00892     uint32_t chunk_info_version;
00893     readField(fields, VER_FIELD_NAME, true, &chunk_info_version);
00894     if (chunk_info_version != CHUNK_INFO_VERSION)
00895         throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str());
00896 
00897     // Read the chunk position, timestamp, and topic count fields
00898     ChunkInfo chunk_info;
00899     readField(fields, CHUNK_POS_FIELD_NAME,  true, &chunk_info.pos);
00900     readField(fields, START_TIME_FIELD_NAME, true,  chunk_info.start_time);
00901     readField(fields, END_TIME_FIELD_NAME,   true,  chunk_info.end_time);
00902     uint32_t chunk_connection_count = 0;
00903     readField(fields, COUNT_FIELD_NAME,      true, &chunk_connection_count);
00904 
00905     ROS_DEBUG("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
00906               (unsigned long long) chunk_info.pos, chunk_connection_count,
00907               chunk_info.start_time.sec, chunk_info.start_time.nsec,
00908               chunk_info.end_time.sec, chunk_info.end_time.nsec);
00909 
00910     // Read the topic count entries
00911     for (uint32_t i = 0; i < chunk_connection_count; i ++) {
00912         uint32_t connection_id, connection_count;
00913         read((char*) &connection_id,    4);
00914         read((char*) &connection_count, 4);
00915 
00916         ROS_DEBUG("  %d: %d messages", connection_id, connection_count);
00917 
00918         chunk_info.connection_counts[connection_id] = connection_count;
00919     }
00920 
00921     chunks_.push_back(chunk_info);
00922 }
00923 
00924 // Record I/O
00925 
00926 bool Bag::isOp(M_string& fields, uint8_t reqOp) const {
00927     uint8_t op = 0xFF; // nonexistent op
00928     readField(fields, OP_FIELD_NAME, true, &op);
00929     return op == reqOp;
00930 }
00931 
00932 void Bag::writeHeader(M_string const& fields) {
00933     boost::shared_array<uint8_t> header_buffer;
00934     uint32_t header_len;
00935     ros::Header::write(fields, header_buffer, header_len);
00936     write((char*) &header_len, 4);
00937     write((char*) header_buffer.get(), header_len);
00938 }
00939 
00940 void Bag::writeDataLength(uint32_t data_len) {
00941     write((char*) &data_len, 4);
00942 }
00943 
00944 void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) {
00945     boost::shared_array<uint8_t> header_buffer;
00946     uint32_t header_len;
00947     ros::Header::write(fields, header_buffer, header_len);
00948 
00949     uint32_t offset = buf.getSize();
00950 
00951     buf.setSize(buf.getSize() + 4 + header_len);
00952 
00953     memcpy(buf.getData() + offset, &header_len, 4);
00954     offset += 4;
00955     memcpy(buf.getData() + offset, header_buffer.get(), header_len);
00956 }
00957 
00958 void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) {
00959     uint32_t offset = buf.getSize();
00960 
00961     buf.setSize(buf.getSize() + 4);
00962 
00963     memcpy(buf.getData() + offset, &data_len, 4);
00964 }
00965 
00967 void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const {
00968     ROS_ASSERT(buffer.getSize() > 8);
00969 
00970     uint8_t* start = (uint8_t*) buffer.getData() + offset;
00971 
00972     uint8_t* ptr = start;
00973 
00974     // Read the header length
00975     uint32_t header_len;
00976     memcpy(&header_len, ptr, 4);
00977     ptr += 4;
00978 
00979     // Parse the header
00980     string error_msg;
00981     bool parsed = header.parse(ptr, header_len, error_msg);
00982     if (!parsed)
00983         throw BagFormatException("Error parsing header");
00984     ptr += header_len;
00985 
00986     // Read the data size
00987     memcpy(&data_size, ptr, 4);
00988     ptr += 4;
00989 
00990     bytes_read = ptr - start;
00991 }
00992 
00993 void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) const {
00994     total_bytes_read = 0;
00995     uint8_t op = 0xFF;
00996     do {
00997         ROS_DEBUG("reading header from buffer: offset=%d", offset);
00998         uint32_t bytes_read;
00999         readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
01000 
01001         offset += bytes_read;
01002         total_bytes_read += bytes_read;
01003         
01004         readField(*header.getValues(), OP_FIELD_NAME, true, &op);
01005     }
01006     while (op == OP_MSG_DEF || op == OP_CONNECTION);
01007 
01008     if (op != OP_MSG_DATA)
01009         throw BagFormatException("Expected MSG_DATA op not found");
01010 }
01011 
01012 bool Bag::readHeader(ros::Header& header) const {
01013     // Read the header length
01014     uint32_t header_len;
01015     read((char*) &header_len, 4);
01016 
01017     // Read the header
01018     header_buffer_.setSize(header_len);
01019     read((char*) header_buffer_.getData(), header_len);
01020 
01021     // Parse the header
01022     string error_msg;
01023     bool parsed = header.parse(header_buffer_.getData(), header_len, error_msg);
01024     if (!parsed)
01025         return false;
01026 
01027     return true;
01028 }
01029 
01030 bool Bag::readDataLength(uint32_t& data_size) const {
01031     read((char*) &data_size, 4);
01032     return true;
01033 }
01034 
01035 M_string::const_iterator Bag::checkField(M_string const& fields, string const& field, unsigned int min_len, unsigned int max_len, bool required) const {
01036     M_string::const_iterator fitr = fields.find(field);
01037     if (fitr == fields.end()) {
01038         if (required)
01039             throw BagFormatException("Required '" + field + "' field missing");
01040     }
01041     else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len))
01042         throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str());
01043 
01044     return fitr;
01045 }
01046 
01047 bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) const {
01048     return readField(fields, field_name, 1, UINT_MAX, required, data);
01049 }
01050 
01051 bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) const {
01052     M_string::const_iterator fitr = checkField(fields, field_name, min_len, max_len, required);
01053     if (fitr == fields.end())
01054         return false;
01055 
01056     data = fitr->second;
01057     return true;
01058 }
01059 
01060 bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) const {
01061     uint64_t packed_time;
01062     if (!readField(fields, field_name, required, &packed_time))
01063         return false;
01064 
01065     uint64_t bitmask = (1LL << 33) - 1;
01066     data.sec  = (uint32_t) (packed_time & bitmask);
01067     data.nsec = (uint32_t) (packed_time >> 32);
01068 
01069     return true;
01070 }
01071 
01072 std::string Bag::toHeaderString(Time const* field) const {
01073     uint64_t packed_time = (((uint64_t) field->nsec) << 32) + field->sec;
01074     return toHeaderString(&packed_time);
01075 }
01076 
01077 ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size) {
01078     ros::AdvertiseOptions opts(c->topic, queue_size, c->md5sum, c->datatype, c->msg_def);
01079     ros::M_string::const_iterator header_iter = c->header->find("latching");
01080     opts.latch = (header_iter != c->header->end() && header_iter->second == "1");
01081     return opts;
01082 }
01083 
01084 // Low-level I/O
01085 
01086 void Bag::write(string const& s)                  { write(s.c_str(), s.length()); }
01087 void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n);    }
01088 
01089 void Bag::read(char* b, std::streamsize n) const  { file_.read(b, n);             }
01090 void Bag::seek(uint64_t pos, int origin) const    { file_.seek(pos, origin);      }
01091 
01092 } // namespace rosbag


rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman
autogenerated on Mon Oct 6 2014 11:47:09