00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef ROSBAG_BAG_H
00036 #define ROSBAG_BAG_H
00037
00038 #include "rosbag/buffer.h"
00039 #include "rosbag/chunked_file.h"
00040 #include "rosbag/constants.h"
00041 #include "rosbag/exceptions.h"
00042 #include "rosbag/structures.h"
00043
00044 #include "ros/header.h"
00045 #include "ros/time.h"
00046 #include "ros/message.h"
00047 #include "ros/message_traits.h"
00048 #include "ros/subscription_callback_helper.h"
00049 #include "ros/ros.h"
00050
00051 #include <ios>
00052 #include <map>
00053 #include <queue>
00054 #include <set>
00055 #include <stdexcept>
00056
00057 #include <boost/format.hpp>
00058 #include <boost/iterator/iterator_facade.hpp>
00059
00060 namespace rosbag {
00061
00062 namespace bagmode
00063 {
00065 enum BagMode
00066 {
00067 Write = 1,
00068 Read = 2,
00069 Append = 4
00070 };
00071 }
00072 typedef bagmode::BagMode BagMode;
00073
00074 class MessageInstance;
00075 class View;
00076 class Query;
00077
00078 class Bag
00079 {
00080 friend class MessageInstance;
00081 friend class View;
00082
00083 public:
00084 Bag();
00085
00087
00093 Bag(std::string const& filename, uint32_t mode = bagmode::Read);
00094
00095 ~Bag();
00096
00098
00104 void open(std::string const& filename, uint32_t mode = bagmode::Read);
00105
00107 void close();
00108
00109 std::string getFileName() const;
00110 BagMode getMode() const;
00111 uint32_t getMajorVersion() const;
00112 uint32_t getMinorVersion() const;
00113 uint64_t getSize() const;
00114
00115 void setCompression(CompressionType compression);
00116 CompressionType getCompression() const;
00117 void setChunkThreshold(uint32_t chunk_threshold);
00118 uint32_t getChunkThreshold() const;
00119
00121
00127 template<class T>
00128 void write(std::string const& topic, ros::MessageEvent<T> const& event);
00129
00131
00139 template<class T>
00140 void write(std::string const& topic, ros::Time const& time, T const& msg,
00141 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00142
00144
00152 template<class T>
00153 void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T const> const& msg,
00154 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00155
00157
00165 template<class T>
00166 void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T> const& msg,
00167 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00168
00169 private:
00170
00171 template<class T>
00172 void doWrite(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> const& connection_header);
00173
00174 void openRead (std::string const& filename);
00175 void openWrite (std::string const& filename);
00176 void openAppend(std::string const& filename);
00177
00178 void closeWrite();
00179
00180 template<class T>
00181 boost::shared_ptr<T> instantiateBuffer(IndexEntry const& index_entry) const;
00182
00183 void startWriting();
00184 void stopWriting();
00185
00186 void startReadingVersion102();
00187 void startReadingVersion200();
00188
00189
00190
00191 void writeVersion();
00192 void writeFileHeaderRecord();
00193 void writeConnectionRecord(ConnectionInfo const* connection_info);
00194 void appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info);
00195 template<class T>
00196 void writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg);
00197 void writeIndexRecords();
00198 void writeConnectionRecords();
00199 void writeChunkInfoRecords();
00200 void startWritingChunk(ros::Time time);
00201 void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size);
00202 void stopWritingChunk();
00203
00204
00205
00206 void readVersion();
00207 void readFileHeaderRecord();
00208 void readConnectionRecord();
00209 void readChunkHeader(ChunkHeader& chunk_header) const;
00210 void readChunkInfoRecord();
00211 void readConnectionIndexRecord200();
00212
00213 void readTopicIndexRecord102();
00214 void readMessageDefinitionRecord102();
00215 void readMessageDataRecord102(uint64_t offset, ros::Header& header) const;
00216
00217 ros::Header readMessageDataHeader(IndexEntry const& index_entry);
00218 uint32_t readMessageDataSize(IndexEntry const& index_entry) const;
00219
00220 template<typename Stream>
00221 void readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) const;
00222
00223 void decompressChunk(uint64_t chunk_pos) const;
00224 void decompressRawChunk(ChunkHeader const& chunk_header) const;
00225 void decompressBz2Chunk(ChunkHeader const& chunk_header) const;
00226 uint32_t getChunkOffset() const;
00227
00228
00229
00230 void writeHeader(ros::M_string const& fields);
00231 void writeDataLength(uint32_t data_len);
00232 void appendHeaderToBuffer(Buffer& buf, ros::M_string const& fields);
00233 void appendDataLengthToBuffer(Buffer& buf, uint32_t data_len);
00234
00235 void readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const;
00236 void readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const;
00237 bool readHeader(ros::Header& header) const;
00238 bool readDataLength(uint32_t& data_size) const;
00239 bool isOp(ros::M_string& fields, uint8_t reqOp) const;
00240
00241
00242
00243 template<typename T>
00244 std::string toHeaderString(T const* field) const;
00245
00246 std::string toHeaderString(ros::Time const* field) const;
00247
00248 template<typename T>
00249 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) const;
00250
00251 bool readField(ros::M_string const& fields, std::string const& field_name, unsigned int min_len, unsigned int max_len, bool required, std::string& data) const;
00252 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, std::string& data) const;
00253
00254 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, ros::Time& data) const;
00255
00256 ros::M_string::const_iterator checkField(ros::M_string const& fields, std::string const& field,
00257 unsigned int min_len, unsigned int max_len, bool required) const;
00258
00259
00260
00261 void write(char const* s, std::streamsize n);
00262 void write(std::string const& s);
00263 void read(char* b, std::streamsize n) const;
00264 void seek(uint64_t pos, int origin = std::ios_base::beg) const;
00265
00266 private:
00267 BagMode mode_;
00268 mutable ChunkedFile file_;
00269 int version_;
00270 CompressionType compression_;
00271 uint32_t chunk_threshold_;
00272 uint32_t bag_revision_;
00273
00274 uint64_t file_size_;
00275 uint64_t file_header_pos_;
00276 uint64_t index_data_pos_;
00277 uint32_t connection_count_;
00278 uint32_t chunk_count_;
00279
00280
00281 bool chunk_open_;
00282 ChunkInfo curr_chunk_info_;
00283 uint64_t curr_chunk_data_pos_;
00284
00285 std::map<std::string, uint32_t> topic_connection_ids_;
00286 std::map<ros::M_string*, uint32_t> header_connection_ids_;
00287 std::map<uint32_t, ConnectionInfo*> connections_;
00288
00289 std::vector<ChunkInfo> chunks_;
00290
00291 std::map<uint32_t, std::multiset<IndexEntry> > connection_indexes_;
00292 std::map<uint32_t, std::multiset<IndexEntry> > curr_chunk_connection_indexes_;
00293
00294 mutable Buffer header_buffer_;
00295 mutable Buffer record_buffer_;
00296
00297 mutable Buffer chunk_buffer_;
00298 mutable Buffer decompress_buffer_;
00299
00300 mutable Buffer outgoing_chunk_buffer_;
00301
00302 mutable Buffer* current_buffer_;
00303
00304 mutable uint64_t decompressed_chunk_;
00305 };
00306
00307 }
00308
00309 #include "rosbag/message_instance.h"
00310
00311 namespace rosbag {
00312
00313
00314
00315 template<class T>
00316 void Bag::write(std::string const& topic, ros::MessageEvent<T> const& event) {
00317 doWrite(topic, event.getReceiptTime(), *event.getMessage(), event.getConnectionHeaderPtr());
00318 }
00319
00320 template<class T>
00321 void Bag::write(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00322 doWrite(topic, time, msg, connection_header);
00323 }
00324
00325 template<class T>
00326 void Bag::write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T const> const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00327 doWrite(topic, time, *msg, connection_header);
00328 }
00329
00330 template<class T>
00331 void Bag::write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T> const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00332 doWrite(topic, time, *msg, connection_header);
00333 }
00334
00335 template<typename T>
00336 std::string Bag::toHeaderString(T const* field) const {
00337 return std::string((char*) field, sizeof(T));
00338 }
00339
00340 template<typename T>
00341 bool Bag::readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) const {
00342 ros::M_string::const_iterator i = checkField(fields, field_name, sizeof(T), sizeof(T), required);
00343 if (i == fields.end())
00344 return false;
00345 memcpy(data, i->second.data(), sizeof(T));
00346 return true;
00347 }
00348
00349 template<typename Stream>
00350 void Bag::readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) const {
00351 ros::Header header;
00352 uint32_t data_size;
00353 uint32_t bytes_read;
00354 switch (version_)
00355 {
00356 case 200:
00357 {
00358 decompressChunk(index_entry.chunk_pos);
00359 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00360 if (data_size > 0)
00361 memcpy(stream.advance(data_size), current_buffer_->getData() + index_entry.offset + bytes_read, data_size);
00362 break;
00363 }
00364 case 102:
00365 {
00366 readMessageDataRecord102(index_entry.chunk_pos, header);
00367 data_size = record_buffer_.getSize();
00368 if (data_size > 0)
00369 memcpy(stream.advance(data_size), record_buffer_.getData(), data_size);
00370 break;
00371 }
00372 default:
00373 throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str());
00374 }
00375 }
00376
00377 template<class T>
00378 boost::shared_ptr<T> Bag::instantiateBuffer(IndexEntry const& index_entry) const {
00379 switch (version_)
00380 {
00381 case 200:
00382 {
00383 decompressChunk(index_entry.chunk_pos);
00384
00385
00386 ros::Header header;
00387 uint32_t data_size;
00388 uint32_t bytes_read;
00389 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00390
00391
00392 uint32_t connection_id;
00393 readField(*header.getValues(), CONNECTION_FIELD_NAME, true, &connection_id);
00394
00395 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
00396 if (connection_iter == connections_.end())
00397 throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str());
00398 ConnectionInfo* connection_info = connection_iter->second;
00399
00400 boost::shared_ptr<T> p = boost::shared_ptr<T>(new T());
00401
00402
00403 ros::assignSubscriptionConnectionHeader<T>(p.get(), connection_info->header);
00404
00405 ros::serialization::PreDeserializeParams<T> predes_params;
00406 predes_params.message = p;
00407 predes_params.connection_header = connection_info->header;
00408 ros::serialization::PreDeserialize<T>::notify(predes_params);
00409
00410
00411 ros::serialization::IStream s(current_buffer_->getData() + index_entry.offset + bytes_read, data_size);
00412 ros::serialization::deserialize(s, *p);
00413
00414 return p;
00415 }
00416 case 102:
00417 {
00418
00419 ros::Header header;
00420 readMessageDataRecord102(index_entry.chunk_pos, header);
00421
00422 ros::M_string& fields = *header.getValues();
00423
00424
00425 std::string topic, latching("0"), callerid;
00426 readField(fields, TOPIC_FIELD_NAME, true, topic);
00427 readField(fields, LATCHING_FIELD_NAME, false, latching);
00428 readField(fields, CALLERID_FIELD_NAME, false, callerid);
00429
00430 std::map<std::string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00431 if (topic_conn_id_iter == topic_connection_ids_.end())
00432 throw BagFormatException((boost::format("Unknown topic: %1%") % topic).str());
00433 uint32_t connection_id = topic_conn_id_iter->second;
00434
00435 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
00436 if (connection_iter == connections_.end())
00437 throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str());
00438 ConnectionInfo* connection_info = connection_iter->second;
00439
00440 boost::shared_ptr<T> p = boost::shared_ptr<T>(new T());
00441
00442
00443 boost::shared_ptr<ros::M_string> message_header(new ros::M_string);
00444 for (ros::M_string::const_iterator i = connection_info->header->begin(); i != connection_info->header->end(); i++)
00445 (*message_header)[i->first] = i->second;
00446 (*message_header)["latching"] = latching;
00447 (*message_header)["callerid"] = callerid;
00448
00449
00450 ros::assignSubscriptionConnectionHeader<T>(p.get(), message_header);
00451
00452 ros::serialization::PreDeserializeParams<T> predes_params;
00453 predes_params.message = p;
00454 predes_params.connection_header = message_header;
00455 ros::serialization::PreDeserialize<T>::notify(predes_params);
00456
00457
00458 ros::serialization::IStream s(record_buffer_.getData(), record_buffer_.getSize());
00459 ros::serialization::deserialize(s, *p);
00460
00461 return p;
00462 }
00463 default:
00464 throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str());
00465 }
00466 }
00467
00468 template<class T>
00469 void Bag::doWrite(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> const& connection_header) {
00470
00471 if (time < ros::TIME_MIN)
00472 {
00473 throw BagException("Tried to insert a message with time less than ros::MIN_TIME");
00474 }
00475
00476
00477 bag_revision_++;
00478
00479
00480 ConnectionInfo* connection_info = NULL;
00481 uint32_t conn_id = 0;
00482 ros::M_string* header_address = connection_header.get();
00483 if (header_address == NULL) {
00484
00485
00486 std::map<std::string, uint32_t>::iterator topic_connection_ids_iter = topic_connection_ids_.find(topic);
00487 if (topic_connection_ids_iter == topic_connection_ids_.end()) {
00488 conn_id = connections_.size();
00489 topic_connection_ids_[topic] = conn_id;
00490 }
00491 else {
00492 conn_id = topic_connection_ids_iter->second;
00493 connection_info = connections_[conn_id];
00494 }
00495 }
00496 else {
00497
00498
00499 std::map<ros::M_string*, uint32_t>::iterator header_connection_ids_iter = header_connection_ids_.find(header_address);
00500 if (header_connection_ids_iter == header_connection_ids_.end()) {
00501 conn_id = connections_.size();
00502 header_connection_ids_[header_address] = conn_id;
00503 }
00504 else {
00505 conn_id = header_connection_ids_iter->second;
00506 connection_info = connections_[conn_id];
00507 }
00508 }
00509
00510 {
00511
00512 seek(0, std::ios::end);
00513 file_size_ = file_.getOffset();
00514
00515
00516 if (!chunk_open_)
00517 startWritingChunk(time);
00518
00519
00520 if (connection_info == NULL) {
00521 connection_info = new ConnectionInfo();
00522 connection_info->id = conn_id;
00523 connection_info->topic = topic;
00524 connection_info->datatype = std::string(ros::message_traits::datatype(msg));
00525 connection_info->md5sum = std::string(ros::message_traits::md5sum(msg));
00526 connection_info->msg_def = std::string(ros::message_traits::definition(msg));
00527 if (connection_header != NULL) {
00528 connection_info->header = connection_header;
00529 }
00530 else {
00531 connection_info->header = boost::shared_ptr<ros::M_string>(new ros::M_string);
00532 (*connection_info->header)["type"] = connection_info->datatype;
00533 (*connection_info->header)["md5sum"] = connection_info->md5sum;
00534 (*connection_info->header)["message_definition"] = connection_info->msg_def;
00535 }
00536 connections_[conn_id] = connection_info;
00537
00538 writeConnectionRecord(connection_info);
00539 appendConnectionRecordToBuffer(outgoing_chunk_buffer_, connection_info);
00540 }
00541
00542
00543 IndexEntry index_entry;
00544 index_entry.time = time;
00545 index_entry.chunk_pos = curr_chunk_info_.pos;
00546 index_entry.offset = getChunkOffset();
00547
00548 std::multiset<IndexEntry>& chunk_connection_index = curr_chunk_connection_indexes_[connection_info->id];
00549 chunk_connection_index.insert(chunk_connection_index.end(), index_entry);
00550 std::multiset<IndexEntry>& connection_index = connection_indexes_[connection_info->id];
00551 connection_index.insert(connection_index.end(), index_entry);
00552
00553
00554 curr_chunk_info_.connection_counts[connection_info->id]++;
00555
00556
00557 writeMessageDataRecord(conn_id, time, msg);
00558
00559
00560 uint32_t chunk_size = getChunkOffset();
00561 ROS_DEBUG(" curr_chunk_size=%d (threshold=%d)", chunk_size, chunk_threshold_);
00562 if (chunk_size > chunk_threshold_) {
00563
00564 stopWritingChunk();
00565 outgoing_chunk_buffer_.setSize(0);
00566
00567
00568 curr_chunk_info_.pos = -1;
00569 }
00570 }
00571 }
00572
00573 template<class T>
00574 void Bag::writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg) {
00575 ros::M_string header;
00576 header[OP_FIELD_NAME] = toHeaderString(&OP_MSG_DATA);
00577 header[CONNECTION_FIELD_NAME] = toHeaderString(&conn_id);
00578 header[TIME_FIELD_NAME] = toHeaderString(&time);
00579
00580
00581 uint32_t msg_ser_len = ros::serialization::serializationLength(msg);
00582
00583 record_buffer_.setSize(msg_ser_len);
00584
00585 ros::serialization::OStream s(record_buffer_.getData(), msg_ser_len);
00586
00587
00588 ros::serialization::serialize(s, msg);
00589
00590
00591
00592
00593 seek(0, std::ios::end);
00594 file_size_ = file_.getOffset();
00595
00596 ROS_DEBUG("Writing MSG_DATA [%llu:%d]: conn=%d sec=%d nsec=%d data_len=%d",
00597 (unsigned long long) file_.getOffset(), getChunkOffset(), conn_id, time.sec, time.nsec, msg_ser_len);
00598
00599 writeHeader(header);
00600 writeDataLength(msg_ser_len);
00601 write((char*) record_buffer_.getData(), msg_ser_len);
00602
00603
00604 appendHeaderToBuffer(outgoing_chunk_buffer_, header);
00605 appendDataLengthToBuffer(outgoing_chunk_buffer_, msg_ser_len);
00606
00607 uint32_t offset = outgoing_chunk_buffer_.getSize();
00608 outgoing_chunk_buffer_.setSize(outgoing_chunk_buffer_.getSize() + msg_ser_len);
00609 memcpy(outgoing_chunk_buffer_.getData() + offset, record_buffer_.getData(), msg_ser_len);
00610
00611
00612 if (time > curr_chunk_info_.end_time)
00613 curr_chunk_info_.end_time = time;
00614 else if (time < curr_chunk_info_.start_time)
00615 curr_chunk_info_.start_time = time;
00616 }
00617
00618 }
00619
00620 #endif