00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef ROSBAG_BAG_H
00036 #define ROSBAG_BAG_H
00037
00038 #include "rosbag/macros.h"
00039 #include "rosbag/buffer.h"
00040 #include "rosbag/chunked_file.h"
00041 #include "rosbag/constants.h"
00042 #include "rosbag/exceptions.h"
00043 #include "rosbag/structures.h"
00044
00045 #include "ros/header.h"
00046 #include "ros/time.h"
00047 #include "ros/message_traits.h"
00048 #include "ros/subscription_callback_helper.h"
00049 #include "ros/ros.h"
00050
00051 #include <ios>
00052 #include <map>
00053 #include <queue>
00054 #include <set>
00055 #include <stdexcept>
00056
00057 #include <boost/format.hpp>
00058 #include <boost/iterator/iterator_facade.hpp>
00059
00060 namespace rosbag {
00061
00062 namespace bagmode
00063 {
00065 enum BagMode
00066 {
00067 Write = 1,
00068 Read = 2,
00069 Append = 4
00070 };
00071 }
00072 typedef bagmode::BagMode BagMode;
00073
00074 class MessageInstance;
00075 class View;
00076 class Query;
00077
00078 class ROSBAG_DECL Bag
00079 {
00080 friend class MessageInstance;
00081 friend class View;
00082
00083 public:
00084 Bag();
00085
00087
00093 Bag(std::string const& filename, uint32_t mode = bagmode::Read);
00094
00095 ~Bag();
00096
00098
00104 void open(std::string const& filename, uint32_t mode = bagmode::Read);
00105
00107 void close();
00108
00109 std::string getFileName() const;
00110 BagMode getMode() const;
00111 uint32_t getMajorVersion() const;
00112 uint32_t getMinorVersion() const;
00113 uint64_t getSize() const;
00114
00115 void setCompression(CompressionType compression);
00116 CompressionType getCompression() const;
00117 void setChunkThreshold(uint32_t chunk_threshold);
00118 uint32_t getChunkThreshold() const;
00119
00121
00127 template<class T>
00128 void write(std::string const& topic, ros::MessageEvent<T> const& event);
00129
00131
00139 template<class T>
00140 void write(std::string const& topic, ros::Time const& time, T const& msg,
00141 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00142
00144
00152 template<class T>
00153 void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T const> const& msg,
00154 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00155
00157
00165 template<class T>
00166 void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T> const& msg,
00167 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00168
00169 private:
00170
00171 template<class T>
00172 void doWrite(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> const& connection_header);
00173
00174 void openRead (std::string const& filename);
00175 void openWrite (std::string const& filename);
00176 void openAppend(std::string const& filename);
00177
00178 void closeWrite();
00179
00180 template<class T>
00181 boost::shared_ptr<T> instantiateBuffer(IndexEntry const& index_entry) const;
00182
00183 void startWriting();
00184 void stopWriting();
00185
00186 void startReadingVersion102();
00187 void startReadingVersion200();
00188
00189
00190
00191 void writeVersion();
00192 void writeFileHeaderRecord();
00193 void writeConnectionRecord(ConnectionInfo const* connection_info);
00194 void appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info);
00195 template<class T>
00196 void writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg);
00197 void writeIndexRecords();
00198 void writeConnectionRecords();
00199 void writeChunkInfoRecords();
00200 void startWritingChunk(ros::Time time);
00201 void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size);
00202 void stopWritingChunk();
00203
00204
00205
00206 void readVersion();
00207 void readFileHeaderRecord();
00208 void readConnectionRecord();
00209 void readChunkHeader(ChunkHeader& chunk_header) const;
00210 void readChunkInfoRecord();
00211 void readConnectionIndexRecord200();
00212
00213 void readTopicIndexRecord102();
00214 void readMessageDefinitionRecord102();
00215 void readMessageDataRecord102(uint64_t offset, ros::Header& header) const;
00216
00217 ros::Header readMessageDataHeader(IndexEntry const& index_entry);
00218 uint32_t readMessageDataSize(IndexEntry const& index_entry) const;
00219
00220 template<typename Stream>
00221 void readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) const;
00222
00223 void decompressChunk(uint64_t chunk_pos) const;
00224 void decompressRawChunk(ChunkHeader const& chunk_header) const;
00225 void decompressBz2Chunk(ChunkHeader const& chunk_header) const;
00226 uint32_t getChunkOffset() const;
00227
00228
00229
00230 void writeHeader(ros::M_string const& fields);
00231 void writeDataLength(uint32_t data_len);
00232 void appendHeaderToBuffer(Buffer& buf, ros::M_string const& fields);
00233 void appendDataLengthToBuffer(Buffer& buf, uint32_t data_len);
00234
00235 void readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const;
00236 void readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const;
00237 bool readHeader(ros::Header& header) const;
00238 bool readDataLength(uint32_t& data_size) const;
00239 bool isOp(ros::M_string& fields, uint8_t reqOp) const;
00240
00241
00242
00243 template<typename T>
00244 std::string toHeaderString(T const* field) const;
00245
00246 std::string toHeaderString(ros::Time const* field) const;
00247
00248 template<typename T>
00249 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) const;
00250
00251 bool readField(ros::M_string const& fields, std::string const& field_name, unsigned int min_len, unsigned int max_len, bool required, std::string& data) const;
00252 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, std::string& data) const;
00253
00254 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, ros::Time& data) const;
00255
00256 ros::M_string::const_iterator checkField(ros::M_string const& fields, std::string const& field,
00257 unsigned int min_len, unsigned int max_len, bool required) const;
00258
00259
00260
00261 void write(char const* s, std::streamsize n);
00262 void write(std::string const& s);
00263 void read(char* b, std::streamsize n) const;
00264 void seek(uint64_t pos, int origin = std::ios_base::beg) const;
00265
00266 private:
00267 BagMode mode_;
00268 mutable ChunkedFile file_;
00269 int version_;
00270 CompressionType compression_;
00271 uint32_t chunk_threshold_;
00272 uint32_t bag_revision_;
00273
00274 uint64_t file_size_;
00275 uint64_t file_header_pos_;
00276 uint64_t index_data_pos_;
00277 uint32_t connection_count_;
00278 uint32_t chunk_count_;
00279
00280
00281 bool chunk_open_;
00282 ChunkInfo curr_chunk_info_;
00283 uint64_t curr_chunk_data_pos_;
00284
00285 std::map<std::string, uint32_t> topic_connection_ids_;
00286 std::map<ros::M_string, uint32_t> header_connection_ids_;
00287 std::map<uint32_t, ConnectionInfo*> connections_;
00288
00289 std::vector<ChunkInfo> chunks_;
00290
00291 std::map<uint32_t, std::multiset<IndexEntry> > connection_indexes_;
00292 std::map<uint32_t, std::multiset<IndexEntry> > curr_chunk_connection_indexes_;
00293
00294 mutable Buffer header_buffer_;
00295 mutable Buffer record_buffer_;
00296
00297 mutable Buffer chunk_buffer_;
00298 mutable Buffer decompress_buffer_;
00299
00300 mutable Buffer outgoing_chunk_buffer_;
00301
00302 mutable Buffer* current_buffer_;
00303
00304 mutable uint64_t decompressed_chunk_;
00305 };
00306
00307 }
00308
00309 #include "rosbag/message_instance.h"
00310
00311 namespace rosbag {
00312
00313
00314
00315 template<class T>
00316 void Bag::write(std::string const& topic, ros::MessageEvent<T> const& event) {
00317 doWrite(topic, event.getReceiptTime(), *event.getMessage(), event.getConnectionHeaderPtr());
00318 }
00319
00320 template<class T>
00321 void Bag::write(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00322 doWrite(topic, time, msg, connection_header);
00323 }
00324
00325 template<class T>
00326 void Bag::write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T const> const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00327 doWrite(topic, time, *msg, connection_header);
00328 }
00329
00330 template<class T>
00331 void Bag::write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T> const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00332 doWrite(topic, time, *msg, connection_header);
00333 }
00334
00335 template<typename T>
00336 std::string Bag::toHeaderString(T const* field) const {
00337 return std::string((char*) field, sizeof(T));
00338 }
00339
00340 template<typename T>
00341 bool Bag::readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) const {
00342 ros::M_string::const_iterator i = checkField(fields, field_name, sizeof(T), sizeof(T), required);
00343 if (i == fields.end())
00344 return false;
00345 memcpy(data, i->second.data(), sizeof(T));
00346 return true;
00347 }
00348
00349 template<typename Stream>
00350 void Bag::readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) const {
00351 ros::Header header;
00352 uint32_t data_size;
00353 uint32_t bytes_read;
00354 switch (version_)
00355 {
00356 case 200:
00357 {
00358 decompressChunk(index_entry.chunk_pos);
00359 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00360 if (data_size > 0)
00361 memcpy(stream.advance(data_size), current_buffer_->getData() + index_entry.offset + bytes_read, data_size);
00362 break;
00363 }
00364 case 102:
00365 {
00366 readMessageDataRecord102(index_entry.chunk_pos, header);
00367 data_size = record_buffer_.getSize();
00368 if (data_size > 0)
00369 memcpy(stream.advance(data_size), record_buffer_.getData(), data_size);
00370 break;
00371 }
00372 default:
00373 throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str());
00374 }
00375 }
00376
00377 template<class T>
00378 boost::shared_ptr<T> Bag::instantiateBuffer(IndexEntry const& index_entry) const {
00379 switch (version_)
00380 {
00381 case 200:
00382 {
00383 decompressChunk(index_entry.chunk_pos);
00384
00385
00386 ros::Header header;
00387 uint32_t data_size;
00388 uint32_t bytes_read;
00389 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00390
00391
00392 uint32_t connection_id;
00393 readField(*header.getValues(), CONNECTION_FIELD_NAME, true, &connection_id);
00394
00395 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
00396 if (connection_iter == connections_.end())
00397 throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str());
00398 ConnectionInfo* connection_info = connection_iter->second;
00399
00400 boost::shared_ptr<T> p = boost::shared_ptr<T>(new T());
00401
00402
00403 ros::assignSubscriptionConnectionHeader<T>(p.get(), connection_info->header);
00404
00405 ros::serialization::PreDeserializeParams<T> predes_params;
00406 predes_params.message = p;
00407 predes_params.connection_header = connection_info->header;
00408 ros::serialization::PreDeserialize<T>::notify(predes_params);
00409
00410
00411 ros::serialization::IStream s(current_buffer_->getData() + index_entry.offset + bytes_read, data_size);
00412 ros::serialization::deserialize(s, *p);
00413
00414 return p;
00415 }
00416 case 102:
00417 {
00418
00419 ros::Header header;
00420 readMessageDataRecord102(index_entry.chunk_pos, header);
00421
00422 ros::M_string& fields = *header.getValues();
00423
00424
00425 std::string topic, latching("0"), callerid;
00426 readField(fields, TOPIC_FIELD_NAME, true, topic);
00427 readField(fields, LATCHING_FIELD_NAME, false, latching);
00428 readField(fields, CALLERID_FIELD_NAME, false, callerid);
00429
00430 std::map<std::string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00431 if (topic_conn_id_iter == topic_connection_ids_.end())
00432 throw BagFormatException((boost::format("Unknown topic: %1%") % topic).str());
00433 uint32_t connection_id = topic_conn_id_iter->second;
00434
00435 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
00436 if (connection_iter == connections_.end())
00437 throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str());
00438 ConnectionInfo* connection_info = connection_iter->second;
00439
00440 boost::shared_ptr<T> p = boost::shared_ptr<T>(new T());
00441
00442
00443 boost::shared_ptr<ros::M_string> message_header(new ros::M_string);
00444 for (ros::M_string::const_iterator i = connection_info->header->begin(); i != connection_info->header->end(); i++)
00445 (*message_header)[i->first] = i->second;
00446 (*message_header)["latching"] = latching;
00447 (*message_header)["callerid"] = callerid;
00448
00449
00450 ros::assignSubscriptionConnectionHeader<T>(p.get(), message_header);
00451
00452 ros::serialization::PreDeserializeParams<T> predes_params;
00453 predes_params.message = p;
00454 predes_params.connection_header = message_header;
00455 ros::serialization::PreDeserialize<T>::notify(predes_params);
00456
00457
00458 ros::serialization::IStream s(record_buffer_.getData(), record_buffer_.getSize());
00459 ros::serialization::deserialize(s, *p);
00460
00461 return p;
00462 }
00463 default:
00464 throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str());
00465 }
00466 }
00467
00468 template<class T>
00469 void Bag::doWrite(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> const& connection_header) {
00470
00471 if (time < ros::TIME_MIN)
00472 {
00473 throw BagException("Tried to insert a message with time less than ros::MIN_TIME");
00474 }
00475
00476
00477 bag_revision_++;
00478
00479
00480 ConnectionInfo* connection_info = NULL;
00481 uint32_t conn_id = 0;
00482 if (!connection_header) {
00483
00484
00485 std::map<std::string, uint32_t>::iterator topic_connection_ids_iter = topic_connection_ids_.find(topic);
00486 if (topic_connection_ids_iter == topic_connection_ids_.end()) {
00487 conn_id = connections_.size();
00488 topic_connection_ids_[topic] = conn_id;
00489 }
00490 else {
00491 conn_id = topic_connection_ids_iter->second;
00492 connection_info = connections_[conn_id];
00493 }
00494 }
00495 else {
00496
00497
00498
00499
00500
00501
00502 ros::M_string connection_header_copy(*connection_header);
00503 connection_header_copy["topic"] = topic;
00504
00505 std::map<ros::M_string, uint32_t>::iterator header_connection_ids_iter = header_connection_ids_.find(connection_header_copy);
00506 if (header_connection_ids_iter == header_connection_ids_.end()) {
00507 conn_id = connections_.size();
00508 header_connection_ids_[connection_header_copy] = conn_id;
00509 }
00510 else {
00511 conn_id = header_connection_ids_iter->second;
00512 connection_info = connections_[conn_id];
00513 }
00514 }
00515
00516 {
00517
00518 seek(0, std::ios::end);
00519 file_size_ = file_.getOffset();
00520
00521
00522 if (!chunk_open_)
00523 startWritingChunk(time);
00524
00525
00526 if (connection_info == NULL) {
00527 connection_info = new ConnectionInfo();
00528 connection_info->id = conn_id;
00529 connection_info->topic = topic;
00530 connection_info->datatype = std::string(ros::message_traits::datatype(msg));
00531 connection_info->md5sum = std::string(ros::message_traits::md5sum(msg));
00532 connection_info->msg_def = std::string(ros::message_traits::definition(msg));
00533 if (connection_header != NULL) {
00534 connection_info->header = connection_header;
00535 }
00536 else {
00537 connection_info->header = boost::shared_ptr<ros::M_string>(new ros::M_string);
00538 (*connection_info->header)["type"] = connection_info->datatype;
00539 (*connection_info->header)["md5sum"] = connection_info->md5sum;
00540 (*connection_info->header)["message_definition"] = connection_info->msg_def;
00541 }
00542 connections_[conn_id] = connection_info;
00543
00544 writeConnectionRecord(connection_info);
00545 appendConnectionRecordToBuffer(outgoing_chunk_buffer_, connection_info);
00546 }
00547
00548
00549 IndexEntry index_entry;
00550 index_entry.time = time;
00551 index_entry.chunk_pos = curr_chunk_info_.pos;
00552 index_entry.offset = getChunkOffset();
00553
00554 std::multiset<IndexEntry>& chunk_connection_index = curr_chunk_connection_indexes_[connection_info->id];
00555 chunk_connection_index.insert(chunk_connection_index.end(), index_entry);
00556 std::multiset<IndexEntry>& connection_index = connection_indexes_[connection_info->id];
00557 connection_index.insert(connection_index.end(), index_entry);
00558
00559
00560 curr_chunk_info_.connection_counts[connection_info->id]++;
00561
00562
00563 writeMessageDataRecord(conn_id, time, msg);
00564
00565
00566 uint32_t chunk_size = getChunkOffset();
00567 ROS_DEBUG(" curr_chunk_size=%d (threshold=%d)", chunk_size, chunk_threshold_);
00568 if (chunk_size > chunk_threshold_) {
00569
00570 stopWritingChunk();
00571 outgoing_chunk_buffer_.setSize(0);
00572
00573
00574 curr_chunk_info_.pos = -1;
00575 }
00576 }
00577 }
00578
00579 template<class T>
00580 void Bag::writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg) {
00581 ros::M_string header;
00582 header[OP_FIELD_NAME] = toHeaderString(&OP_MSG_DATA);
00583 header[CONNECTION_FIELD_NAME] = toHeaderString(&conn_id);
00584 header[TIME_FIELD_NAME] = toHeaderString(&time);
00585
00586
00587 uint32_t msg_ser_len = ros::serialization::serializationLength(msg);
00588
00589 record_buffer_.setSize(msg_ser_len);
00590
00591 ros::serialization::OStream s(record_buffer_.getData(), msg_ser_len);
00592
00593
00594 ros::serialization::serialize(s, msg);
00595
00596
00597
00598
00599 seek(0, std::ios::end);
00600 file_size_ = file_.getOffset();
00601
00602 ROS_DEBUG("Writing MSG_DATA [%llu:%d]: conn=%d sec=%d nsec=%d data_len=%d",
00603 (unsigned long long) file_.getOffset(), getChunkOffset(), conn_id, time.sec, time.nsec, msg_ser_len);
00604
00605 writeHeader(header);
00606 writeDataLength(msg_ser_len);
00607 write((char*) record_buffer_.getData(), msg_ser_len);
00608
00609
00610 appendHeaderToBuffer(outgoing_chunk_buffer_, header);
00611 appendDataLengthToBuffer(outgoing_chunk_buffer_, msg_ser_len);
00612
00613 uint32_t offset = outgoing_chunk_buffer_.getSize();
00614 outgoing_chunk_buffer_.setSize(outgoing_chunk_buffer_.getSize() + msg_ser_len);
00615 memcpy(outgoing_chunk_buffer_.getData() + offset, record_buffer_.getData(), msg_ser_len);
00616
00617
00618 if (time > curr_chunk_info_.end_time)
00619 curr_chunk_info_.end_time = time;
00620 else if (time < curr_chunk_info_.start_time)
00621 curr_chunk_info_.start_time = time;
00622 }
00623
00624 }
00625
00626 #endif