00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef ROSBAG_BAG_H
00036 #define ROSBAG_BAG_H
00037
00038 #include "rosbag/macros.h"
00039
00040 #include "rosbag/buffer.h"
00041 #include "rosbag/chunked_file.h"
00042 #include "rosbag/constants.h"
00043 #include "rosbag/exceptions.h"
00044 #include "rosbag/structures.h"
00045
00046 #include "ros/header.h"
00047 #include "ros/time.h"
00048 #include "ros/message_traits.h"
00049 #include "ros/message_event.h"
00050 #include "ros/serialization.h"
00051
00052
00053
00054 #include <ios>
00055 #include <map>
00056 #include <queue>
00057 #include <set>
00058 #include <stdexcept>
00059
00060 #include <boost/format.hpp>
00061 #include <boost/iterator/iterator_facade.hpp>
00062
00063 #include "console_bridge/console.h"
00064
00065 namespace rosbag {
00066
00067 namespace bagmode
00068 {
00070 enum BagMode
00071 {
00072 Write = 1,
00073 Read = 2,
00074 Append = 4
00075 };
00076 }
00077 typedef bagmode::BagMode BagMode;
00078
00079 class MessageInstance;
00080 class View;
00081 class Query;
00082
00083 class ROSBAG_DECL Bag
00084 {
00085 friend class MessageInstance;
00086 friend class View;
00087
00088 public:
00089 Bag();
00090
00092
00098 Bag(std::string const& filename, uint32_t mode = bagmode::Read);
00099
00100 ~Bag();
00101
00103
00109 void open(std::string const& filename, uint32_t mode = bagmode::Read);
00110
00112 void close();
00113
00114 std::string getFileName() const;
00115 BagMode getMode() const;
00116 uint32_t getMajorVersion() const;
00117 uint32_t getMinorVersion() const;
00118 uint64_t getSize() const;
00119
00120 void setCompression(CompressionType compression);
00121 CompressionType getCompression() const;
00122 void setChunkThreshold(uint32_t chunk_threshold);
00123 uint32_t getChunkThreshold() const;
00124
00126
00132 template<class T>
00133 void write(std::string const& topic, ros::MessageEvent<T> const& event);
00134
00136
00144 template<class T>
00145 void write(std::string const& topic, ros::Time const& time, T const& msg,
00146 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00147
00149
00157 template<class T>
00158 void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T const> const& msg,
00159 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00160
00162
00170 template<class T>
00171 void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T> const& msg,
00172 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00173
00174 private:
00175
00176 template<class T>
00177 void doWrite(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> const& connection_header);
00178
00179 void openRead (std::string const& filename);
00180 void openWrite (std::string const& filename);
00181 void openAppend(std::string const& filename);
00182
00183 void closeWrite();
00184
00185 template<class T>
00186 boost::shared_ptr<T> instantiateBuffer(IndexEntry const& index_entry) const;
00187
00188 void startWriting();
00189 void stopWriting();
00190
00191 void startReadingVersion102();
00192 void startReadingVersion200();
00193
00194
00195
00196 void writeVersion();
00197 void writeFileHeaderRecord();
00198 void writeConnectionRecord(ConnectionInfo const* connection_info);
00199 void appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info);
00200 template<class T>
00201 void writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg);
00202 void writeIndexRecords();
00203 void writeConnectionRecords();
00204 void writeChunkInfoRecords();
00205 void startWritingChunk(ros::Time time);
00206 void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size);
00207 void stopWritingChunk();
00208
00209
00210
00211 void readVersion();
00212 void readFileHeaderRecord();
00213 void readConnectionRecord();
00214 void readChunkHeader(ChunkHeader& chunk_header) const;
00215 void readChunkInfoRecord();
00216 void readConnectionIndexRecord200();
00217
00218 void readTopicIndexRecord102();
00219 void readMessageDefinitionRecord102();
00220 void readMessageDataRecord102(uint64_t offset, ros::Header& header) const;
00221
00222 ros::Header readMessageDataHeader(IndexEntry const& index_entry);
00223 uint32_t readMessageDataSize(IndexEntry const& index_entry) const;
00224
00225 template<typename Stream>
00226 void readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) const;
00227
00228 void decompressChunk(uint64_t chunk_pos) const;
00229 void decompressRawChunk(ChunkHeader const& chunk_header) const;
00230 void decompressBz2Chunk(ChunkHeader const& chunk_header) const;
00231 void decompressLz4Chunk(ChunkHeader const& chunk_header) const;
00232 uint32_t getChunkOffset() const;
00233
00234
00235
00236 void writeHeader(ros::M_string const& fields);
00237 void writeDataLength(uint32_t data_len);
00238 void appendHeaderToBuffer(Buffer& buf, ros::M_string const& fields);
00239 void appendDataLengthToBuffer(Buffer& buf, uint32_t data_len);
00240
00241 void readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const;
00242 void readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const;
00243 bool readHeader(ros::Header& header) const;
00244 bool readDataLength(uint32_t& data_size) const;
00245 bool isOp(ros::M_string& fields, uint8_t reqOp) const;
00246
00247
00248
00249 template<typename T>
00250 std::string toHeaderString(T const* field) const;
00251
00252 std::string toHeaderString(ros::Time const* field) const;
00253
00254 template<typename T>
00255 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) const;
00256
00257 bool readField(ros::M_string const& fields, std::string const& field_name, unsigned int min_len, unsigned int max_len, bool required, std::string& data) const;
00258 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, std::string& data) const;
00259
00260 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, ros::Time& data) const;
00261
00262 ros::M_string::const_iterator checkField(ros::M_string const& fields, std::string const& field,
00263 unsigned int min_len, unsigned int max_len, bool required) const;
00264
00265
00266
00267 void write(char const* s, std::streamsize n);
00268 void write(std::string const& s);
00269 void read(char* b, std::streamsize n) const;
00270 void seek(uint64_t pos, int origin = std::ios_base::beg) const;
00271
00272 private:
00273 BagMode mode_;
00274 mutable ChunkedFile file_;
00275 int version_;
00276 CompressionType compression_;
00277 uint32_t chunk_threshold_;
00278 uint32_t bag_revision_;
00279
00280 uint64_t file_size_;
00281 uint64_t file_header_pos_;
00282 uint64_t index_data_pos_;
00283 uint32_t connection_count_;
00284 uint32_t chunk_count_;
00285
00286
00287 bool chunk_open_;
00288 ChunkInfo curr_chunk_info_;
00289 uint64_t curr_chunk_data_pos_;
00290
00291 std::map<std::string, uint32_t> topic_connection_ids_;
00292 std::map<ros::M_string, uint32_t> header_connection_ids_;
00293 std::map<uint32_t, ConnectionInfo*> connections_;
00294
00295 std::vector<ChunkInfo> chunks_;
00296
00297 std::map<uint32_t, std::multiset<IndexEntry> > connection_indexes_;
00298 std::map<uint32_t, std::multiset<IndexEntry> > curr_chunk_connection_indexes_;
00299
00300 mutable Buffer header_buffer_;
00301 mutable Buffer record_buffer_;
00302
00303 mutable Buffer chunk_buffer_;
00304 mutable Buffer decompress_buffer_;
00305
00306 mutable Buffer outgoing_chunk_buffer_;
00307
00308 mutable Buffer* current_buffer_;
00309
00310 mutable uint64_t decompressed_chunk_;
00311 };
00312
00313 }
00314
00315 #include "rosbag/message_instance.h"
00316
00317 namespace rosbag {
00318
00319
00320
00321 template<class T>
00322 void Bag::write(std::string const& topic, ros::MessageEvent<T> const& event) {
00323 doWrite(topic, event.getReceiptTime(), *event.getMessage(), event.getConnectionHeaderPtr());
00324 }
00325
00326 template<class T>
00327 void Bag::write(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00328 doWrite(topic, time, msg, connection_header);
00329 }
00330
00331 template<class T>
00332 void Bag::write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T const> const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00333 doWrite(topic, time, *msg, connection_header);
00334 }
00335
00336 template<class T>
00337 void Bag::write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T> const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00338 doWrite(topic, time, *msg, connection_header);
00339 }
00340
00341 template<typename T>
00342 std::string Bag::toHeaderString(T const* field) const {
00343 return std::string((char*) field, sizeof(T));
00344 }
00345
00346 template<typename T>
00347 bool Bag::readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) const {
00348 ros::M_string::const_iterator i = checkField(fields, field_name, sizeof(T), sizeof(T), required);
00349 if (i == fields.end())
00350 return false;
00351 memcpy(data, i->second.data(), sizeof(T));
00352 return true;
00353 }
00354
00355 template<typename Stream>
00356 void Bag::readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) const {
00357 ros::Header header;
00358 uint32_t data_size;
00359 uint32_t bytes_read;
00360 switch (version_)
00361 {
00362 case 200:
00363 {
00364 decompressChunk(index_entry.chunk_pos);
00365 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00366 if (data_size > 0)
00367 memcpy(stream.advance(data_size), current_buffer_->getData() + index_entry.offset + bytes_read, data_size);
00368 break;
00369 }
00370 case 102:
00371 {
00372 readMessageDataRecord102(index_entry.chunk_pos, header);
00373 data_size = record_buffer_.getSize();
00374 if (data_size > 0)
00375 memcpy(stream.advance(data_size), record_buffer_.getData(), data_size);
00376 break;
00377 }
00378 default:
00379 throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str());
00380 }
00381 }
00382
00383 template<class T>
00384 boost::shared_ptr<T> Bag::instantiateBuffer(IndexEntry const& index_entry) const {
00385 switch (version_)
00386 {
00387 case 200:
00388 {
00389 decompressChunk(index_entry.chunk_pos);
00390
00391
00392 ros::Header header;
00393 uint32_t data_size;
00394 uint32_t bytes_read;
00395 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00396
00397
00398 uint32_t connection_id;
00399 readField(*header.getValues(), CONNECTION_FIELD_NAME, true, &connection_id);
00400
00401 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
00402 if (connection_iter == connections_.end())
00403 throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str());
00404 ConnectionInfo* connection_info = connection_iter->second;
00405
00406 boost::shared_ptr<T> p = boost::make_shared<T>();
00407
00408 ros::serialization::PreDeserializeParams<T> predes_params;
00409 predes_params.message = p;
00410 predes_params.connection_header = connection_info->header;
00411 ros::serialization::PreDeserialize<T>::notify(predes_params);
00412
00413
00414 ros::serialization::IStream s(current_buffer_->getData() + index_entry.offset + bytes_read, data_size);
00415 ros::serialization::deserialize(s, *p);
00416
00417 return p;
00418 }
00419 case 102:
00420 {
00421
00422 ros::Header header;
00423 readMessageDataRecord102(index_entry.chunk_pos, header);
00424
00425 ros::M_string& fields = *header.getValues();
00426
00427
00428 std::string topic, latching("0"), callerid;
00429 readField(fields, TOPIC_FIELD_NAME, true, topic);
00430 readField(fields, LATCHING_FIELD_NAME, false, latching);
00431 readField(fields, CALLERID_FIELD_NAME, false, callerid);
00432
00433 std::map<std::string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00434 if (topic_conn_id_iter == topic_connection_ids_.end())
00435 throw BagFormatException((boost::format("Unknown topic: %1%") % topic).str());
00436 uint32_t connection_id = topic_conn_id_iter->second;
00437
00438 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
00439 if (connection_iter == connections_.end())
00440 throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str());
00441 ConnectionInfo* connection_info = connection_iter->second;
00442
00443 boost::shared_ptr<T> p = boost::make_shared<T>();
00444
00445
00446 boost::shared_ptr<ros::M_string> message_header(boost::make_shared<ros::M_string>());
00447 for (ros::M_string::const_iterator i = connection_info->header->begin(); i != connection_info->header->end(); i++)
00448 (*message_header)[i->first] = i->second;
00449 (*message_header)["latching"] = latching;
00450 (*message_header)["callerid"] = callerid;
00451
00452 ros::serialization::PreDeserializeParams<T> predes_params;
00453 predes_params.message = p;
00454 predes_params.connection_header = message_header;
00455 ros::serialization::PreDeserialize<T>::notify(predes_params);
00456
00457
00458 ros::serialization::IStream s(record_buffer_.getData(), record_buffer_.getSize());
00459 ros::serialization::deserialize(s, *p);
00460
00461 return p;
00462 }
00463 default:
00464 throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str());
00465 }
00466 }
00467
00468 template<class T>
00469 void Bag::doWrite(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> const& connection_header) {
00470
00471 if (time < ros::TIME_MIN)
00472 {
00473 throw BagException("Tried to insert a message with time less than ros::TIME_MIN");
00474 }
00475
00476
00477 bag_revision_++;
00478
00479
00480 ConnectionInfo* connection_info = NULL;
00481 uint32_t conn_id = 0;
00482 if (!connection_header) {
00483
00484
00485 std::map<std::string, uint32_t>::iterator topic_connection_ids_iter = topic_connection_ids_.find(topic);
00486 if (topic_connection_ids_iter == topic_connection_ids_.end()) {
00487 conn_id = connections_.size();
00488 topic_connection_ids_[topic] = conn_id;
00489 }
00490 else {
00491 conn_id = topic_connection_ids_iter->second;
00492 connection_info = connections_[conn_id];
00493 }
00494 }
00495 else {
00496
00497
00498
00499
00500
00501
00502 ros::M_string connection_header_copy(*connection_header);
00503 connection_header_copy["topic"] = topic;
00504
00505 std::map<ros::M_string, uint32_t>::iterator header_connection_ids_iter = header_connection_ids_.find(connection_header_copy);
00506 if (header_connection_ids_iter == header_connection_ids_.end()) {
00507 conn_id = connections_.size();
00508 header_connection_ids_[connection_header_copy] = conn_id;
00509 }
00510 else {
00511 conn_id = header_connection_ids_iter->second;
00512 connection_info = connections_[conn_id];
00513 }
00514 }
00515
00516 {
00517
00518 seek(0, std::ios::end);
00519 file_size_ = file_.getOffset();
00520
00521
00522 if (!chunk_open_)
00523 startWritingChunk(time);
00524
00525
00526 if (connection_info == NULL) {
00527 connection_info = new ConnectionInfo();
00528 connection_info->id = conn_id;
00529 connection_info->topic = topic;
00530 connection_info->datatype = std::string(ros::message_traits::datatype(msg));
00531 connection_info->md5sum = std::string(ros::message_traits::md5sum(msg));
00532 connection_info->msg_def = std::string(ros::message_traits::definition(msg));
00533 if (connection_header != NULL) {
00534 connection_info->header = connection_header;
00535 }
00536 else {
00537 connection_info->header = boost::make_shared<ros::M_string>();
00538 (*connection_info->header)["type"] = connection_info->datatype;
00539 (*connection_info->header)["md5sum"] = connection_info->md5sum;
00540 (*connection_info->header)["message_definition"] = connection_info->msg_def;
00541 }
00542 connections_[conn_id] = connection_info;
00543
00544 writeConnectionRecord(connection_info);
00545 appendConnectionRecordToBuffer(outgoing_chunk_buffer_, connection_info);
00546 }
00547
00548
00549 IndexEntry index_entry;
00550 index_entry.time = time;
00551 index_entry.chunk_pos = curr_chunk_info_.pos;
00552 index_entry.offset = getChunkOffset();
00553
00554 std::multiset<IndexEntry>& chunk_connection_index = curr_chunk_connection_indexes_[connection_info->id];
00555 chunk_connection_index.insert(chunk_connection_index.end(), index_entry);
00556 std::multiset<IndexEntry>& connection_index = connection_indexes_[connection_info->id];
00557 connection_index.insert(connection_index.end(), index_entry);
00558
00559
00560 curr_chunk_info_.connection_counts[connection_info->id]++;
00561
00562
00563 writeMessageDataRecord(conn_id, time, msg);
00564
00565
00566 uint32_t chunk_size = getChunkOffset();
00567 logDebug(" curr_chunk_size=%d (threshold=%d)", chunk_size, chunk_threshold_);
00568 if (chunk_size > chunk_threshold_) {
00569
00570 stopWritingChunk();
00571 outgoing_chunk_buffer_.setSize(0);
00572
00573
00574 curr_chunk_info_.pos = -1;
00575 }
00576 }
00577 }
00578
00579 template<class T>
00580 void Bag::writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg) {
00581 ros::M_string header;
00582 header[OP_FIELD_NAME] = toHeaderString(&OP_MSG_DATA);
00583 header[CONNECTION_FIELD_NAME] = toHeaderString(&conn_id);
00584 header[TIME_FIELD_NAME] = toHeaderString(&time);
00585
00586
00587 uint32_t msg_ser_len = ros::serialization::serializationLength(msg);
00588
00589 record_buffer_.setSize(msg_ser_len);
00590
00591 ros::serialization::OStream s(record_buffer_.getData(), msg_ser_len);
00592
00593
00594 ros::serialization::serialize(s, msg);
00595
00596
00597
00598
00599 seek(0, std::ios::end);
00600 file_size_ = file_.getOffset();
00601
00602 logDebug("Writing MSG_DATA [%llu:%d]: conn=%d sec=%d nsec=%d data_len=%d",
00603 (unsigned long long) file_.getOffset(), getChunkOffset(), conn_id, time.sec, time.nsec, msg_ser_len);
00604
00605 writeHeader(header);
00606 writeDataLength(msg_ser_len);
00607 write((char*) record_buffer_.getData(), msg_ser_len);
00608
00609
00610 appendHeaderToBuffer(outgoing_chunk_buffer_, header);
00611 appendDataLengthToBuffer(outgoing_chunk_buffer_, msg_ser_len);
00612
00613 uint32_t offset = outgoing_chunk_buffer_.getSize();
00614 outgoing_chunk_buffer_.setSize(outgoing_chunk_buffer_.getSize() + msg_ser_len);
00615 memcpy(outgoing_chunk_buffer_.getData() + offset, record_buffer_.getData(), msg_ser_len);
00616
00617
00618 if (time > curr_chunk_info_.end_time)
00619 curr_chunk_info_.end_time = time;
00620 else if (time < curr_chunk_info_.start_time)
00621 curr_chunk_info_.start_time = time;
00622 }
00623
00624 }
00625
00626 #endif