Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "cartographer_ros/playable_bag.h"
00018
00019 #include "absl/memory/memory.h"
00020 #include "cartographer_ros/node_constants.h"
00021 #include "glog/logging.h"
00022 #include "tf2_msgs/TFMessage.h"
00023
00024 namespace cartographer_ros {
00025
00026 PlayableBag::PlayableBag(
00027 const std::string& bag_filename, const int bag_id,
00028 const ros::Time start_time, const ros::Time end_time,
00029 const ros::Duration buffer_delay,
00030 FilteringEarlyMessageHandler filtering_early_message_handler)
00031 : bag_(absl::make_unique<rosbag::Bag>(bag_filename, rosbag::bagmode::Read)),
00032 view_(absl::make_unique<rosbag::View>(*bag_, start_time, end_time)),
00033 view_iterator_(view_->begin()),
00034 finished_(false),
00035 bag_id_(bag_id),
00036 bag_filename_(bag_filename),
00037 duration_in_seconds_(
00038 (view_->getEndTime() - view_->getBeginTime()).toSec()),
00039 message_counter_(0),
00040 buffer_delay_(buffer_delay),
00041 filtering_early_message_handler_(
00042 std::move(filtering_early_message_handler)) {
00043 AdvanceUntilMessageAvailable();
00044 for (const auto* connection_info : view_->getConnections()) {
00045 topics_.insert(connection_info->topic);
00046 }
00047 }
00048
00049 ros::Time PlayableBag::PeekMessageTime() const {
00050 CHECK(IsMessageAvailable());
00051 return buffered_messages_.front().getTime();
00052 }
00053
00054 std::tuple<ros::Time, ros::Time> PlayableBag::GetBeginEndTime() const {
00055 return std::make_tuple(view_->getBeginTime(), view_->getEndTime());
00056 }
00057
00058 rosbag::MessageInstance PlayableBag::GetNextMessage(
00059 cartographer_ros_msgs::BagfileProgress* progress) {
00060 CHECK(IsMessageAvailable());
00061 const rosbag::MessageInstance msg = buffered_messages_.front();
00062 buffered_messages_.pop_front();
00063 AdvanceUntilMessageAvailable();
00064 double processed_seconds = (msg.getTime() - view_->getBeginTime()).toSec();
00065 if ((message_counter_ % 10000) == 0) {
00066 LOG(INFO) << "Processed " << processed_seconds << " of "
00067 << duration_in_seconds_ << " seconds of bag " << bag_filename_;
00068 }
00069
00070 if (progress) {
00071 progress->current_bagfile_name = bag_filename_;
00072 progress->current_bagfile_id = bag_id_;
00073 progress->total_messages = view_->size();
00074 progress->processed_messages = message_counter_;
00075 progress->total_seconds = duration_in_seconds_;
00076 progress->processed_seconds = processed_seconds;
00077 }
00078
00079 return msg;
00080 }
00081
00082 bool PlayableBag::IsMessageAvailable() const {
00083 return !buffered_messages_.empty() &&
00084 (buffered_messages_.front().getTime() <
00085 buffered_messages_.back().getTime() - buffer_delay_);
00086 }
00087
00088 int PlayableBag::bag_id() const { return bag_id_; }
00089
00090 void PlayableBag::AdvanceOneMessage() {
00091 CHECK(!finished_);
00092 if (view_iterator_ == view_->end()) {
00093 finished_ = true;
00094 return;
00095 }
00096 rosbag::MessageInstance& msg = *view_iterator_;
00097 if (!filtering_early_message_handler_ ||
00098 filtering_early_message_handler_(msg)) {
00099 buffered_messages_.push_back(msg);
00100 }
00101 ++view_iterator_;
00102 ++message_counter_;
00103 }
00104
00105 void PlayableBag::AdvanceUntilMessageAvailable() {
00106 if (finished_) {
00107 return;
00108 }
00109 do {
00110 AdvanceOneMessage();
00111 } while (!finished_ && !IsMessageAvailable());
00112 }
00113
00114 PlayableBagMultiplexer::PlayableBagMultiplexer() : pnh_("~") {
00115 bag_progress_pub_ = pnh_.advertise<cartographer_ros_msgs::BagfileProgress>(
00116 "bagfile_progress", 10);
00117 progress_pub_interval_ = pnh_.param("bagfile_progress_pub_interval", 10.0);
00118 }
00119
00120 void PlayableBagMultiplexer::AddPlayableBag(PlayableBag playable_bag) {
00121 for (const auto& topic : playable_bag.topics()) {
00122 topics_.insert(topic);
00123 }
00124 playable_bags_.push_back(std::move(playable_bag));
00125 CHECK(playable_bags_.back().IsMessageAvailable());
00126 next_message_queue_.emplace(
00127 BagMessageItem{playable_bags_.back().PeekMessageTime(),
00128 static_cast<int>(playable_bags_.size() - 1)});
00129 bag_progress_time_map_[playable_bag.bag_id()] = ros::Time::now();
00130 }
00131
00132 bool PlayableBagMultiplexer::IsMessageAvailable() const {
00133 return !next_message_queue_.empty();
00134 }
00135
00136 std::tuple<rosbag::MessageInstance, int, bool>
00137 PlayableBagMultiplexer::GetNextMessage() {
00138 CHECK(IsMessageAvailable());
00139 const int current_bag_index = next_message_queue_.top().bag_index;
00140 PlayableBag& current_bag = playable_bags_.at(current_bag_index);
00141 cartographer_ros_msgs::BagfileProgress progress;
00142 rosbag::MessageInstance msg = current_bag.GetNextMessage(&progress);
00143 const bool publish_progress =
00144 current_bag.finished() ||
00145 ros::Time::now() - bag_progress_time_map_[current_bag.bag_id()] >=
00146 ros::Duration(progress_pub_interval_);
00147 if (bag_progress_pub_.getNumSubscribers() > 0 && publish_progress) {
00148 progress.total_bagfiles = playable_bags_.size();
00149 if (current_bag.finished()) {
00150 progress.processed_seconds = current_bag.duration_in_seconds();
00151 }
00152 bag_progress_pub_.publish(progress);
00153 bag_progress_time_map_[current_bag.bag_id()] = ros::Time::now();
00154 }
00155 CHECK_EQ(msg.getTime(), next_message_queue_.top().message_timestamp);
00156 next_message_queue_.pop();
00157 if (current_bag.IsMessageAvailable()) {
00158 next_message_queue_.emplace(
00159 BagMessageItem{current_bag.PeekMessageTime(), current_bag_index});
00160 }
00161 return std::make_tuple(std::move(msg), current_bag.bag_id(),
00162 !current_bag.IsMessageAvailable());
00163 }
00164
00165 ros::Time PlayableBagMultiplexer::PeekMessageTime() const {
00166 CHECK(IsMessageAvailable());
00167 return next_message_queue_.top().message_timestamp;
00168 }
00169
00170 }