00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef ROSBAG_BAG_H
00036 #define ROSBAG_BAG_H
00037
00038 #include "rosbag/macros.h"
00039
00040 #include "rosbag/buffer.h"
00041 #include "rosbag/chunked_file.h"
00042 #include "rosbag/constants.h"
00043 #include "rosbag/exceptions.h"
00044 #include "rosbag/structures.h"
00045
00046 #include "ros/header.h"
00047 #include "ros/time.h"
00048 #include "ros/message_traits.h"
00049 #include "ros/message_event.h"
00050 #include "ros/serialization.h"
00051
00052
00053
00054 #include <ios>
00055 #include <map>
00056 #include <queue>
00057 #include <set>
00058 #include <stdexcept>
00059
00060 #include <boost/format.hpp>
00061 #include <boost/iterator/iterator_facade.hpp>
00062
00063 #include "console_bridge/console.h"
00064
00065 namespace rosbag {
00066
00067 namespace bagmode
00068 {
00070 enum BagMode
00071 {
00072 Write = 1,
00073 Read = 2,
00074 Append = 4
00075 };
00076 }
00077 typedef bagmode::BagMode BagMode;
00078
00079 class MessageInstance;
00080 class View;
00081 class Query;
00082
00083 class ROSBAG_DECL Bag
00084 {
00085 friend class MessageInstance;
00086 friend class View;
00087
00088 public:
00089 Bag();
00090
00092
00098 Bag(std::string const& filename, uint32_t mode = bagmode::Read);
00099
00100 ~Bag();
00101
00103
00109 void open(std::string const& filename, uint32_t mode = bagmode::Read);
00110
00112 void close();
00113
00114 std::string getFileName() const;
00115 BagMode getMode() const;
00116 uint32_t getMajorVersion() const;
00117 uint32_t getMinorVersion() const;
00118 uint64_t getSize() const;
00119
00120 void setCompression(CompressionType compression);
00121 CompressionType getCompression() const;
00122 void setChunkThreshold(uint32_t chunk_threshold);
00123 uint32_t getChunkThreshold() const;
00124
00126
00132 template<class T>
00133 void write(std::string const& topic, ros::MessageEvent<T> const& event);
00134
00136
00144 template<class T>
00145 void write(std::string const& topic, ros::Time const& time, T const& msg,
00146 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00147
00149
00157 template<class T>
00158 void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T const> const& msg,
00159 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00160
00162
00170 template<class T>
00171 void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T> const& msg,
00172 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
00173
00174 private:
00175
00176 template<class T>
00177 void doWrite(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> const& connection_header);
00178
00179 void openRead (std::string const& filename);
00180 void openWrite (std::string const& filename);
00181 void openAppend(std::string const& filename);
00182
00183 void closeWrite();
00184
00185 template<class T>
00186 boost::shared_ptr<T> instantiateBuffer(IndexEntry const& index_entry) const;
00187
00188 void startWriting();
00189 void stopWriting();
00190
00191 void startReadingVersion102();
00192 void startReadingVersion200();
00193
00194
00195
00196 void writeVersion();
00197 void writeFileHeaderRecord();
00198 void writeConnectionRecord(ConnectionInfo const* connection_info);
00199 void appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info);
00200 template<class T>
00201 void writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg);
00202 void writeIndexRecords();
00203 void writeConnectionRecords();
00204 void writeChunkInfoRecords();
00205 void startWritingChunk(ros::Time time);
00206 void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size);
00207 void stopWritingChunk();
00208
00209
00210
00211 void readVersion();
00212 void readFileHeaderRecord();
00213 void readConnectionRecord();
00214 void readChunkHeader(ChunkHeader& chunk_header) const;
00215 void readChunkInfoRecord();
00216 void readConnectionIndexRecord200();
00217
00218 void readTopicIndexRecord102();
00219 void readMessageDefinitionRecord102();
00220 void readMessageDataRecord102(uint64_t offset, ros::Header& header) const;
00221
00222 ros::Header readMessageDataHeader(IndexEntry const& index_entry);
00223 uint32_t readMessageDataSize(IndexEntry const& index_entry) const;
00224
00225 template<typename Stream>
00226 void readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) const;
00227
00228 void decompressChunk(uint64_t chunk_pos) const;
00229 void decompressRawChunk(ChunkHeader const& chunk_header) const;
00230 void decompressBz2Chunk(ChunkHeader const& chunk_header) const;
00231 uint32_t getChunkOffset() const;
00232
00233
00234
00235 void writeHeader(ros::M_string const& fields);
00236 void writeDataLength(uint32_t data_len);
00237 void appendHeaderToBuffer(Buffer& buf, ros::M_string const& fields);
00238 void appendDataLengthToBuffer(Buffer& buf, uint32_t data_len);
00239
00240 void readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const;
00241 void readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const;
00242 bool readHeader(ros::Header& header) const;
00243 bool readDataLength(uint32_t& data_size) const;
00244 bool isOp(ros::M_string& fields, uint8_t reqOp) const;
00245
00246
00247
00248 template<typename T>
00249 std::string toHeaderString(T const* field) const;
00250
00251 std::string toHeaderString(ros::Time const* field) const;
00252
00253 template<typename T>
00254 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) const;
00255
00256 bool readField(ros::M_string const& fields, std::string const& field_name, unsigned int min_len, unsigned int max_len, bool required, std::string& data) const;
00257 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, std::string& data) const;
00258
00259 bool readField(ros::M_string const& fields, std::string const& field_name, bool required, ros::Time& data) const;
00260
00261 ros::M_string::const_iterator checkField(ros::M_string const& fields, std::string const& field,
00262 unsigned int min_len, unsigned int max_len, bool required) const;
00263
00264
00265
00266 void write(char const* s, std::streamsize n);
00267 void write(std::string const& s);
00268 void read(char* b, std::streamsize n) const;
00269 void seek(uint64_t pos, int origin = std::ios_base::beg) const;
00270
00271 private:
00272 BagMode mode_;
00273 mutable ChunkedFile file_;
00274 int version_;
00275 CompressionType compression_;
00276 uint32_t chunk_threshold_;
00277 uint32_t bag_revision_;
00278
00279 uint64_t file_size_;
00280 uint64_t file_header_pos_;
00281 uint64_t index_data_pos_;
00282 uint32_t connection_count_;
00283 uint32_t chunk_count_;
00284
00285
00286 bool chunk_open_;
00287 ChunkInfo curr_chunk_info_;
00288 uint64_t curr_chunk_data_pos_;
00289
00290 std::map<std::string, uint32_t> topic_connection_ids_;
00291 std::map<ros::M_string, uint32_t> header_connection_ids_;
00292 std::map<uint32_t, ConnectionInfo*> connections_;
00293
00294 std::vector<ChunkInfo> chunks_;
00295
00296 std::map<uint32_t, std::multiset<IndexEntry> > connection_indexes_;
00297 std::map<uint32_t, std::multiset<IndexEntry> > curr_chunk_connection_indexes_;
00298
00299 mutable Buffer header_buffer_;
00300 mutable Buffer record_buffer_;
00301
00302 mutable Buffer chunk_buffer_;
00303 mutable Buffer decompress_buffer_;
00304
00305 mutable Buffer outgoing_chunk_buffer_;
00306
00307 mutable Buffer* current_buffer_;
00308
00309 mutable uint64_t decompressed_chunk_;
00310 };
00311
00312 }
00313
00314 #include "rosbag/message_instance.h"
00315
00316 namespace rosbag {
00317
00318
00319
00320 template<class T>
00321 void Bag::write(std::string const& topic, ros::MessageEvent<T> const& event) {
00322 doWrite(topic, event.getReceiptTime(), *event.getMessage(), event.getConnectionHeaderPtr());
00323 }
00324
00325 template<class T>
00326 void Bag::write(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00327 doWrite(topic, time, msg, connection_header);
00328 }
00329
00330 template<class T>
00331 void Bag::write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T const> const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00332 doWrite(topic, time, *msg, connection_header);
00333 }
00334
00335 template<class T>
00336 void Bag::write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T> const& msg, boost::shared_ptr<ros::M_string> connection_header) {
00337 doWrite(topic, time, *msg, connection_header);
00338 }
00339
00340 template<typename T>
00341 std::string Bag::toHeaderString(T const* field) const {
00342 return std::string((char*) field, sizeof(T));
00343 }
00344
00345 template<typename T>
00346 bool Bag::readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) const {
00347 ros::M_string::const_iterator i = checkField(fields, field_name, sizeof(T), sizeof(T), required);
00348 if (i == fields.end())
00349 return false;
00350 memcpy(data, i->second.data(), sizeof(T));
00351 return true;
00352 }
00353
00354 template<typename Stream>
00355 void Bag::readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) const {
00356 ros::Header header;
00357 uint32_t data_size;
00358 uint32_t bytes_read;
00359 switch (version_)
00360 {
00361 case 200:
00362 {
00363 decompressChunk(index_entry.chunk_pos);
00364 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00365 if (data_size > 0)
00366 memcpy(stream.advance(data_size), current_buffer_->getData() + index_entry.offset + bytes_read, data_size);
00367 break;
00368 }
00369 case 102:
00370 {
00371 readMessageDataRecord102(index_entry.chunk_pos, header);
00372 data_size = record_buffer_.getSize();
00373 if (data_size > 0)
00374 memcpy(stream.advance(data_size), record_buffer_.getData(), data_size);
00375 break;
00376 }
00377 default:
00378 throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str());
00379 }
00380 }
00381
00382 template<class T>
00383 boost::shared_ptr<T> Bag::instantiateBuffer(IndexEntry const& index_entry) const {
00384 switch (version_)
00385 {
00386 case 200:
00387 {
00388 decompressChunk(index_entry.chunk_pos);
00389
00390
00391 ros::Header header;
00392 uint32_t data_size;
00393 uint32_t bytes_read;
00394 readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
00395
00396
00397 uint32_t connection_id;
00398 readField(*header.getValues(), CONNECTION_FIELD_NAME, true, &connection_id);
00399
00400 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
00401 if (connection_iter == connections_.end())
00402 throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str());
00403 ConnectionInfo* connection_info = connection_iter->second;
00404
00405 boost::shared_ptr<T> p = boost::shared_ptr<T>(new T());
00406
00407
00408 ros::assignSubscriptionConnectionHeader<T>(p.get(), connection_info->header);
00409
00410 ros::serialization::PreDeserializeParams<T> predes_params;
00411 predes_params.message = p;
00412 predes_params.connection_header = connection_info->header;
00413 ros::serialization::PreDeserialize<T>::notify(predes_params);
00414
00415
00416 ros::serialization::IStream s(current_buffer_->getData() + index_entry.offset + bytes_read, data_size);
00417 ros::serialization::deserialize(s, *p);
00418
00419 return p;
00420 }
00421 case 102:
00422 {
00423
00424 ros::Header header;
00425 readMessageDataRecord102(index_entry.chunk_pos, header);
00426
00427 ros::M_string& fields = *header.getValues();
00428
00429
00430 std::string topic, latching("0"), callerid;
00431 readField(fields, TOPIC_FIELD_NAME, true, topic);
00432 readField(fields, LATCHING_FIELD_NAME, false, latching);
00433 readField(fields, CALLERID_FIELD_NAME, false, callerid);
00434
00435 std::map<std::string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
00436 if (topic_conn_id_iter == topic_connection_ids_.end())
00437 throw BagFormatException((boost::format("Unknown topic: %1%") % topic).str());
00438 uint32_t connection_id = topic_conn_id_iter->second;
00439
00440 std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
00441 if (connection_iter == connections_.end())
00442 throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str());
00443 ConnectionInfo* connection_info = connection_iter->second;
00444
00445 boost::shared_ptr<T> p = boost::shared_ptr<T>(new T());
00446
00447
00448 boost::shared_ptr<ros::M_string> message_header(new ros::M_string);
00449 for (ros::M_string::const_iterator i = connection_info->header->begin(); i != connection_info->header->end(); i++)
00450 (*message_header)[i->first] = i->second;
00451 (*message_header)["latching"] = latching;
00452 (*message_header)["callerid"] = callerid;
00453
00454
00455 ros::assignSubscriptionConnectionHeader<T>(p.get(), message_header);
00456
00457 ros::serialization::PreDeserializeParams<T> predes_params;
00458 predes_params.message = p;
00459 predes_params.connection_header = message_header;
00460 ros::serialization::PreDeserialize<T>::notify(predes_params);
00461
00462
00463 ros::serialization::IStream s(record_buffer_.getData(), record_buffer_.getSize());
00464 ros::serialization::deserialize(s, *p);
00465
00466 return p;
00467 }
00468 default:
00469 throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str());
00470 }
00471 }
00472
00473 template<class T>
00474 void Bag::doWrite(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> const& connection_header) {
00475
00476 if (time < ros::TIME_MIN)
00477 {
00478 throw BagException("Tried to insert a message with time less than ros::MIN_TIME");
00479 }
00480
00481
00482 bag_revision_++;
00483
00484
00485 ConnectionInfo* connection_info = NULL;
00486 uint32_t conn_id = 0;
00487 if (!connection_header) {
00488
00489
00490 std::map<std::string, uint32_t>::iterator topic_connection_ids_iter = topic_connection_ids_.find(topic);
00491 if (topic_connection_ids_iter == topic_connection_ids_.end()) {
00492 conn_id = connections_.size();
00493 topic_connection_ids_[topic] = conn_id;
00494 }
00495 else {
00496 conn_id = topic_connection_ids_iter->second;
00497 connection_info = connections_[conn_id];
00498 }
00499 }
00500 else {
00501
00502
00503
00504
00505
00506
00507 ros::M_string connection_header_copy(*connection_header);
00508 connection_header_copy["topic"] = topic;
00509
00510 std::map<ros::M_string, uint32_t>::iterator header_connection_ids_iter = header_connection_ids_.find(connection_header_copy);
00511 if (header_connection_ids_iter == header_connection_ids_.end()) {
00512 conn_id = connections_.size();
00513 header_connection_ids_[connection_header_copy] = conn_id;
00514 }
00515 else {
00516 conn_id = header_connection_ids_iter->second;
00517 connection_info = connections_[conn_id];
00518 }
00519 }
00520
00521 {
00522
00523 seek(0, std::ios::end);
00524 file_size_ = file_.getOffset();
00525
00526
00527 if (!chunk_open_)
00528 startWritingChunk(time);
00529
00530
00531 if (connection_info == NULL) {
00532 connection_info = new ConnectionInfo();
00533 connection_info->id = conn_id;
00534 connection_info->topic = topic;
00535 connection_info->datatype = std::string(ros::message_traits::datatype(msg));
00536 connection_info->md5sum = std::string(ros::message_traits::md5sum(msg));
00537 connection_info->msg_def = std::string(ros::message_traits::definition(msg));
00538 if (connection_header != NULL) {
00539 connection_info->header = connection_header;
00540 }
00541 else {
00542 connection_info->header = boost::shared_ptr<ros::M_string>(new ros::M_string);
00543 (*connection_info->header)["type"] = connection_info->datatype;
00544 (*connection_info->header)["md5sum"] = connection_info->md5sum;
00545 (*connection_info->header)["message_definition"] = connection_info->msg_def;
00546 }
00547 connections_[conn_id] = connection_info;
00548
00549 writeConnectionRecord(connection_info);
00550 appendConnectionRecordToBuffer(outgoing_chunk_buffer_, connection_info);
00551 }
00552
00553
00554 IndexEntry index_entry;
00555 index_entry.time = time;
00556 index_entry.chunk_pos = curr_chunk_info_.pos;
00557 index_entry.offset = getChunkOffset();
00558
00559 std::multiset<IndexEntry>& chunk_connection_index = curr_chunk_connection_indexes_[connection_info->id];
00560 chunk_connection_index.insert(chunk_connection_index.end(), index_entry);
00561 std::multiset<IndexEntry>& connection_index = connection_indexes_[connection_info->id];
00562 connection_index.insert(connection_index.end(), index_entry);
00563
00564
00565 curr_chunk_info_.connection_counts[connection_info->id]++;
00566
00567
00568 writeMessageDataRecord(conn_id, time, msg);
00569
00570
00571 uint32_t chunk_size = getChunkOffset();
00572 logDebug(" curr_chunk_size=%d (threshold=%d)", chunk_size, chunk_threshold_);
00573 if (chunk_size > chunk_threshold_) {
00574
00575 stopWritingChunk();
00576 outgoing_chunk_buffer_.setSize(0);
00577
00578
00579 curr_chunk_info_.pos = -1;
00580 }
00581 }
00582 }
00583
00584 template<class T>
00585 void Bag::writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg) {
00586 ros::M_string header;
00587 header[OP_FIELD_NAME] = toHeaderString(&OP_MSG_DATA);
00588 header[CONNECTION_FIELD_NAME] = toHeaderString(&conn_id);
00589 header[TIME_FIELD_NAME] = toHeaderString(&time);
00590
00591
00592 uint32_t msg_ser_len = ros::serialization::serializationLength(msg);
00593
00594 record_buffer_.setSize(msg_ser_len);
00595
00596 ros::serialization::OStream s(record_buffer_.getData(), msg_ser_len);
00597
00598
00599 ros::serialization::serialize(s, msg);
00600
00601
00602
00603
00604 seek(0, std::ios::end);
00605 file_size_ = file_.getOffset();
00606
00607 logDebug("Writing MSG_DATA [%llu:%d]: conn=%d sec=%d nsec=%d data_len=%d",
00608 (unsigned long long) file_.getOffset(), getChunkOffset(), conn_id, time.sec, time.nsec, msg_ser_len);
00609
00610 writeHeader(header);
00611 writeDataLength(msg_ser_len);
00612 write((char*) record_buffer_.getData(), msg_ser_len);
00613
00614
00615 appendHeaderToBuffer(outgoing_chunk_buffer_, header);
00616 appendDataLengthToBuffer(outgoing_chunk_buffer_, msg_ser_len);
00617
00618 uint32_t offset = outgoing_chunk_buffer_.getSize();
00619 outgoing_chunk_buffer_.setSize(outgoing_chunk_buffer_.getSize() + msg_ser_len);
00620 memcpy(outgoing_chunk_buffer_.getData() + offset, record_buffer_.getData(), msg_ser_len);
00621
00622
00623 if (time > curr_chunk_info_.end_time)
00624 curr_chunk_info_.end_time = time;
00625 else if (time < curr_chunk_info_.start_time)
00626 curr_chunk_info_.start_time = time;
00627 }
00628
00629 }
00630
00631 #endif