00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef ROSBAG_BAG_H
00036 #define ROSBAG_BAG_H
00037
00038 #include "rosbag/buffer.h"
00039 #include "rosbag/chunked_file.h"
00040 #include "rosbag/constants.h"
00041 #include "rosbag/exceptions.h"
00042 #include "rosbag/structures.h"
00043
00044 #include "ros/header.h"
00045 #include "ros/time.h"
00046 #include "ros/message_traits.h"
00047 #include "ros/subscription_callback_helper.h"
00048 #include "ros/ros.h"
00049
00050 #include <ios>
00051 #include <map>
00052 #include <queue>
00053 #include <set>
00054 #include <stdexcept>
00055
00056 #include <boost/format.hpp>
00057 #include <boost/iterator/iterator_facade.hpp>
00058
00059 namespace rosbag {
00060
00061 namespace bagmode
00062 {
00064 enum BagMode
00065 {
00066 Write = 1,
00067 Read = 2,
00068 Append = 4
00069 };
00070 }
00071 typedef bagmode::BagMode BagMode;
00072
00073 class MessageInstance;
00074 class View;
00075 class Query;
00076
00077 class Bag
00078 {
00079 friend class MessageInstance;
00080 friend class View;
00081
00082 public:
00083 Bag();
00084
00086
00092 Bag(std::string const& filename, uint32_t mode = bagmode::Read);
00093
00094 ~Bag();
00095
00097
00103 void open(std::string const& filename, uint32_t mode = bagmode::Read);
00104
00106 void close();
00107
00108 std::string getFileName() const;
00109 BagMode getMode() const;
00110 uint32_t getMajorVersion() const;
00111 uint32_t getMinorVersion() const;
00112 uint64_t getSize() const;
00113
00114 void setCompression(CompressionType compression);
00115 CompressionType getCompression() const;
00116 void setChunkThreshold(uint32_t chunk_threshold);
00117 uint32_t getChunkThreshold() const;
00118
00120
00126 template<class T>
00127 void write(std::string const& topic, ros::MessageEvent<T> const& event);
00128
00130
00138 template<class T>
00139 void write(std::string const& topic, ros::Time const& time, T const& msg,
00140 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00141
00143
00151 template<class T>
00152 void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T const> const& msg,
00153 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00154
00156
00164 template<class T>
00165 void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T> const& msg,
00166 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00167
00168 private:
00169
00170 template<class T>
00171 void doWrite(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> const& connection_header);
00172
00173 void openRead (std::string const& filename);
00174 void openWrite (std::string const& filename);
00175 void openAppend(std::string const& filename);
00176
00177 void closeWrite();
00178
00179 template<class T>
00180 boost::shared_ptr<T> instantiateBuffer(IndexEntry const& index_entry) const;
00181
00182 void startWriting();
00183 void stopWriting();
00184
00185 void startReadingVersion102();
00186 void startReadingVersion200();
00187
00188
00189
00190 void writeVersion();
00191 void writeFileHeaderRecord();
00192 void writeConnectionRecord(ConnectionInfo const* connection_info);
00193 void appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info);
00194 template<class T>
00195 void writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg);
00196 void writeIndexRecords();
00197 void writeConnectionRecords();
00198 void writeChunkInfoRecords();
00199 void startWritingChunk(ros::Time time);
00200 void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size);
00201 void stopWritingChunk();
00202
00203
00204
00205 void readVersion();
00206 void readFileHeaderRecord();
00207 void readConnectionRecord();
00208 void readChunkHeader(ChunkHeader& chunk_header) const;
00209 void readChunkInfoRecord();
00210 void readConnectionIndexRecord200();
00211
00212 void readTopicIndexRecord102();
00213 void readMessageDefinitionRecord102();
00214 void readMessageDataRecord102(uint64_t offset, ros::Header& header) const;
00215
00216 ros::Header readMessageDataHeader(IndexEntry const& index_entry);
00217 uint32_t readMessageDataSize(IndexEntry const& index_entry) const;
00218
00219 template<typename Stream>
00220 void readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) const;
00221
00222 void decompressChunk(uint64_t chunk_pos) const;
00223 void decompressRawChunk(ChunkHeader const& chunk_header) const;
00224 void decompressBz2Chunk(ChunkHeader const& chunk_header) const;
00225 uint32_t getChunkOffset() const;
00226
00227
00228
00229 void writeHeader(ros::M_string const& fields);
00230 void writeDataLength(uint32_t data_len);
00231 void appendHeaderToBuffer(Buffer& buf, ros::M_string const& fields);
00232 void appendDataLengthToBuffer(Buffer& buf, uint32_t data_len);
00233
00234 void readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const;
00235 void readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const;
00236 bool readHeader(ros::Header& header) const;
00237 bool readDataLength(uint32_t& data_size) const;
00238 bool isOp(ros::M_string& fields, uint8_t reqOp) const;
00239
00240
00241
00242 template<typename T>
00243 std::string toHeaderString(T const* field) const;
00244
00245 std::string toHeaderString(ros::Time const* field) const;
00246
00247 template<typename T>
00248 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) const;
00249
00250 bool readField(ros::M_string const& fields, std::string const& field_name, unsigned int min_len, unsigned int max_len, bool required, std::string& data) const;
00251 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, std::string& data) const;
00252
00253 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, ros::Time& data) const;
00254
00255 ros::M_string::const_iterator checkField(ros::M_string const& fields, std::string const& field,
00256 unsigned int min_len, unsigned int max_len, bool required) const;
00257
00258
00259
00260 void write(char const* s, std::streamsize n);
00261 void write(std::string const& s);
00262 void read(char* b, std::streamsize n) const;
00263 void seek(uint64_t pos, int origin = std::ios_base::beg) const;
00264
00265 private:
00266 BagMode mode_;
00267 mutable ChunkedFile file_;
00268 int version_;
00269 CompressionType compression_;
00270 uint32_t chunk_threshold_;
00271 uint32_t bag_revision_;
00272
00273 uint64_t file_size_;
00274 uint64_t file_header_pos_;
00275 uint64_t index_data_pos_;
00276 uint32_t connection_count_;
00277 uint32_t chunk_count_;
00278
00279
00280 bool chunk_open_;
00281 ChunkInfo curr_chunk_info_;
00282 uint64_t curr_chunk_data_pos_;
00283
00284 std::map<std::string, uint32_t> topic_connection_ids_;
00285 std::map<ros::M_string, uint32_t> header_connection_ids_;
00286 std::map<uint32_t, ConnectionInfo*> connections_;
00287
00288 std::vector<ChunkInfo> chunks_;
00289
00290 std::map<uint32_t, std::multiset<IndexEntry> > connection_indexes_;
00291 std::map<uint32_t, std::multiset<IndexEntry> > curr_chunk_connection_indexes_;
00292
00293 mutable Buffer header_buffer_;
00294 mutable Buffer record_buffer_;
00295
00296 mutable Buffer chunk_buffer_;
00297 mutable Buffer decompress_buffer_;
00298
00299 mutable Buffer outgoing_chunk_buffer_;
00300
00301 mutable Buffer* current_buffer_;
00302
00303 mutable uint64_t decompressed_chunk_;
00304 };
00305
00306 }
00307
00308 #include "rosbag/message_instance.h"
00309
00310 namespace rosbag {
00311
00312
00313
00314 template<class T>
00315 void Bag::write(std::string const& topic, ros::MessageEvent<T> const& event) {
00316 doWrite(topic, event.getReceiptTime(), *event.getMessage(), event.getConnectionHeaderPtr());
00317 }
00318
00319 template<class T>
00320 void Bag::write(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00321 doWrite(topic, time, msg, connection_header);
00322 }
00323
00324 template<class T>
00325 void Bag::write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T const> const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00326 doWrite(topic, time, *msg, connection_header);
00327 }
00328
00329 template<class T>
00330 void Bag::write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T> const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00331 doWrite(topic, time, *msg, connection_header);
00332 }
00333
00334 template<typename T>
00335 std::string Bag::toHeaderString(T const* field) const {
00336 return std::string((char*) field, sizeof(T));
00337 }
00338
00339 template<typename T>
00340 bool Bag::readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) const {
00341 ros::M_string::const_iterator i = checkField(fields, field_name, sizeof(T), sizeof(T), required);
00342 if (i == fields.end())
00343 return false;
00344 memcpy(data, i->second.data(), sizeof(T));
00345 return true;
00346 }
00347
00348 template<typename Stream>
00349 void Bag::readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) const {
00350 ros::Header header;
00351 uint32_t data_size;
00352 uint32_t bytes_read;
00353 switch (version_)
00354 {
00355 case 200:
00356 {
00357 decompressChunk(index_entry.chunk_pos);
00358 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00359 if (data_size > 0)
00360 memcpy(stream.advance(data_size), current_buffer_->getData() + index_entry.offset + bytes_read, data_size);
00361 break;
00362 }
00363 case 102:
00364 {
00365 readMessageDataRecord102(index_entry.chunk_pos, header);
00366 data_size = record_buffer_.getSize();
00367 if (data_size > 0)
00368 memcpy(stream.advance(data_size), record_buffer_.getData(), data_size);
00369 break;
00370 }
00371 default:
00372 throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str());
00373 }
00374 }
00375
00376 template<class T>
00377 boost::shared_ptr<T> Bag::instantiateBuffer(IndexEntry const& index_entry) const {
00378 switch (version_)
00379 {
00380 case 200:
00381 {
00382 decompressChunk(index_entry.chunk_pos);
00383
00384
00385 ros::Header header;
00386 uint32_t data_size;
00387 uint32_t bytes_read;
00388 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00389
00390
00391 uint32_t connection_id;
00392 readField(*header.getValues(), CONNECTION_FIELD_NAME, true, &connection_id);
00393
00394 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
00395 if (connection_iter == connections_.end())
00396 throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str());
00397 ConnectionInfo* connection_info = connection_iter->second;
00398
00399 boost::shared_ptr<T> p = boost::shared_ptr<T>(new T());
00400
00401
00402 ros::assignSubscriptionConnectionHeader<T>(p.get(), connection_info->header);
00403
00404 ros::serialization::PreDeserializeParams<T> predes_params;
00405 predes_params.message = p;
00406 predes_params.connection_header = connection_info->header;
00407 ros::serialization::PreDeserialize<T>::notify(predes_params);
00408
00409
00410 ros::serialization::IStream s(current_buffer_->getData() + index_entry.offset + bytes_read, data_size);
00411 ros::serialization::deserialize(s, *p);
00412
00413 return p;
00414 }
00415 case 102:
00416 {
00417
00418 ros::Header header;
00419 readMessageDataRecord102(index_entry.chunk_pos, header);
00420
00421 ros::M_string& fields = *header.getValues();
00422
00423
00424 std::string topic, latching("0"), callerid;
00425 readField(fields, TOPIC_FIELD_NAME, true, topic);
00426 readField(fields, LATCHING_FIELD_NAME, false, latching);
00427 readField(fields, CALLERID_FIELD_NAME, false, callerid);
00428
00429 std::map<std::string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00430 if (topic_conn_id_iter == topic_connection_ids_.end())
00431 throw BagFormatException((boost::format("Unknown topic: %1%") % topic).str());
00432 uint32_t connection_id = topic_conn_id_iter->second;
00433
00434 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
00435 if (connection_iter == connections_.end())
00436 throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str());
00437 ConnectionInfo* connection_info = connection_iter->second;
00438
00439 boost::shared_ptr<T> p = boost::shared_ptr<T>(new T());
00440
00441
00442 boost::shared_ptr<ros::M_string> message_header(new ros::M_string);
00443 for (ros::M_string::const_iterator i = connection_info->header->begin(); i != connection_info->header->end(); i++)
00444 (*message_header)[i->first] = i->second;
00445 (*message_header)["latching"] = latching;
00446 (*message_header)["callerid"] = callerid;
00447
00448
00449 ros::assignSubscriptionConnectionHeader<T>(p.get(), message_header);
00450
00451 ros::serialization::PreDeserializeParams<T> predes_params;
00452 predes_params.message = p;
00453 predes_params.connection_header = message_header;
00454 ros::serialization::PreDeserialize<T>::notify(predes_params);
00455
00456
00457 ros::serialization::IStream s(record_buffer_.getData(), record_buffer_.getSize());
00458 ros::serialization::deserialize(s, *p);
00459
00460 return p;
00461 }
00462 default:
00463 throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str());
00464 }
00465 }
00466
00467 template<class T>
00468 void Bag::doWrite(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> const& connection_header) {
00469
00470 if (time < ros::TIME_MIN)
00471 {
00472 throw BagException("Tried to insert a message with time less than ros::MIN_TIME");
00473 }
00474
00475
00476 bag_revision_++;
00477
00478
00479 ConnectionInfo* connection_info = NULL;
00480 uint32_t conn_id = 0;
00481 if (!connection_header) {
00482
00483
00484 std::map<std::string, uint32_t>::iterator topic_connection_ids_iter = topic_connection_ids_.find(topic);
00485 if (topic_connection_ids_iter == topic_connection_ids_.end()) {
00486 conn_id = connections_.size();
00487 topic_connection_ids_[topic] = conn_id;
00488 }
00489 else {
00490 conn_id = topic_connection_ids_iter->second;
00491 connection_info = connections_[conn_id];
00492 }
00493 }
00494 else {
00495
00496
00497
00498
00499
00500
00501 ros::M_string connection_header_copy(*connection_header);
00502 connection_header_copy["topic"] = topic;
00503
00504 std::map<ros::M_string, uint32_t>::iterator header_connection_ids_iter = header_connection_ids_.find(connection_header_copy);
00505 if (header_connection_ids_iter == header_connection_ids_.end()) {
00506 conn_id = connections_.size();
00507 header_connection_ids_[connection_header_copy] = conn_id;
00508 }
00509 else {
00510 conn_id = header_connection_ids_iter->second;
00511 connection_info = connections_[conn_id];
00512 }
00513 }
00514
00515 {
00516
00517 seek(0, std::ios::end);
00518 file_size_ = file_.getOffset();
00519
00520
00521 if (!chunk_open_)
00522 startWritingChunk(time);
00523
00524
00525 if (connection_info == NULL) {
00526 connection_info = new ConnectionInfo();
00527 connection_info->id = conn_id;
00528 connection_info->topic = topic;
00529 connection_info->datatype = std::string(ros::message_traits::datatype(msg));
00530 connection_info->md5sum = std::string(ros::message_traits::md5sum(msg));
00531 connection_info->msg_def = std::string(ros::message_traits::definition(msg));
00532 if (connection_header != NULL) {
00533 connection_info->header = connection_header;
00534 }
00535 else {
00536 connection_info->header = boost::shared_ptr<ros::M_string>(new ros::M_string);
00537 (*connection_info->header)["type"] = connection_info->datatype;
00538 (*connection_info->header)["md5sum"] = connection_info->md5sum;
00539 (*connection_info->header)["message_definition"] = connection_info->msg_def;
00540 }
00541 connections_[conn_id] = connection_info;
00542
00543 writeConnectionRecord(connection_info);
00544 appendConnectionRecordToBuffer(outgoing_chunk_buffer_, connection_info);
00545 }
00546
00547
00548 IndexEntry index_entry;
00549 index_entry.time = time;
00550 index_entry.chunk_pos = curr_chunk_info_.pos;
00551 index_entry.offset = getChunkOffset();
00552
00553 std::multiset<IndexEntry>& chunk_connection_index = curr_chunk_connection_indexes_[connection_info->id];
00554 chunk_connection_index.insert(chunk_connection_index.end(), index_entry);
00555 std::multiset<IndexEntry>& connection_index = connection_indexes_[connection_info->id];
00556 connection_index.insert(connection_index.end(), index_entry);
00557
00558
00559 curr_chunk_info_.connection_counts[connection_info->id]++;
00560
00561
00562 writeMessageDataRecord(conn_id, time, msg);
00563
00564
00565 uint32_t chunk_size = getChunkOffset();
00566 ROS_DEBUG(" curr_chunk_size=%d (threshold=%d)", chunk_size, chunk_threshold_);
00567 if (chunk_size > chunk_threshold_) {
00568
00569 stopWritingChunk();
00570 outgoing_chunk_buffer_.setSize(0);
00571
00572
00573 curr_chunk_info_.pos = -1;
00574 }
00575 }
00576 }
00577
00578 template<class T>
00579 void Bag::writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg) {
00580 ros::M_string header;
00581 header[OP_FIELD_NAME] = toHeaderString(&OP_MSG_DATA);
00582 header[CONNECTION_FIELD_NAME] = toHeaderString(&conn_id);
00583 header[TIME_FIELD_NAME] = toHeaderString(&time);
00584
00585
00586 uint32_t msg_ser_len = ros::serialization::serializationLength(msg);
00587
00588 record_buffer_.setSize(msg_ser_len);
00589
00590 ros::serialization::OStream s(record_buffer_.getData(), msg_ser_len);
00591
00592
00593 ros::serialization::serialize(s, msg);
00594
00595
00596
00597
00598 seek(0, std::ios::end);
00599 file_size_ = file_.getOffset();
00600
00601 ROS_DEBUG("Writing MSG_DATA [%llu:%d]: conn=%d sec=%d nsec=%d data_len=%d",
00602 (unsigned long long) file_.getOffset(), getChunkOffset(), conn_id, time.sec, time.nsec, msg_ser_len);
00603
00604 writeHeader(header);
00605 writeDataLength(msg_ser_len);
00606 write((char*) record_buffer_.getData(), msg_ser_len);
00607
00608
00609 appendHeaderToBuffer(outgoing_chunk_buffer_, header);
00610 appendDataLengthToBuffer(outgoing_chunk_buffer_, msg_ser_len);
00611
00612 uint32_t offset = outgoing_chunk_buffer_.getSize();
00613 outgoing_chunk_buffer_.setSize(outgoing_chunk_buffer_.getSize() + msg_ser_len);
00614 memcpy(outgoing_chunk_buffer_.getData() + offset, record_buffer_.getData(), msg_ser_len);
00615
00616
00617 if (time > curr_chunk_info_.end_time)
00618 curr_chunk_info_.end_time = time;
00619 else if (time < curr_chunk_info_.start_time)
00620 curr_chunk_info_.start_time = time;
00621 }
00622
00623 }
00624
00625 #endif