playable_bag.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright 2018 The Cartographer Authors
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *      http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00017 #include "cartographer_ros/playable_bag.h"
00018 
00019 #include "absl/memory/memory.h"
00020 #include "cartographer_ros/node_constants.h"
00021 #include "glog/logging.h"
00022 #include "tf2_msgs/TFMessage.h"
00023 
00024 namespace cartographer_ros {
00025 
00026 PlayableBag::PlayableBag(
00027     const std::string& bag_filename, const int bag_id,
00028     const ros::Time start_time, const ros::Time end_time,
00029     const ros::Duration buffer_delay,
00030     FilteringEarlyMessageHandler filtering_early_message_handler)
00031     : bag_(absl::make_unique<rosbag::Bag>(bag_filename, rosbag::bagmode::Read)),
00032       view_(absl::make_unique<rosbag::View>(*bag_, start_time, end_time)),
00033       view_iterator_(view_->begin()),
00034       finished_(false),
00035       bag_id_(bag_id),
00036       bag_filename_(bag_filename),
00037       duration_in_seconds_(
00038           (view_->getEndTime() - view_->getBeginTime()).toSec()),
00039       message_counter_(0),
00040       buffer_delay_(buffer_delay),
00041       filtering_early_message_handler_(
00042           std::move(filtering_early_message_handler)) {
00043   AdvanceUntilMessageAvailable();
00044   for (const auto* connection_info : view_->getConnections()) {
00045     topics_.insert(connection_info->topic);
00046   }
00047 }
00048 
00049 ros::Time PlayableBag::PeekMessageTime() const {
00050   CHECK(IsMessageAvailable());
00051   return buffered_messages_.front().getTime();
00052 }
00053 
00054 std::tuple<ros::Time, ros::Time> PlayableBag::GetBeginEndTime() const {
00055   return std::make_tuple(view_->getBeginTime(), view_->getEndTime());
00056 }
00057 
00058 rosbag::MessageInstance PlayableBag::GetNextMessage(
00059     cartographer_ros_msgs::BagfileProgress* progress) {
00060   CHECK(IsMessageAvailable());
00061   const rosbag::MessageInstance msg = buffered_messages_.front();
00062   buffered_messages_.pop_front();
00063   AdvanceUntilMessageAvailable();
00064   double processed_seconds = (msg.getTime() - view_->getBeginTime()).toSec();
00065   if ((message_counter_ % 10000) == 0) {
00066     LOG(INFO) << "Processed " << processed_seconds << " of "
00067               << duration_in_seconds_ << " seconds of bag " << bag_filename_;
00068   }
00069 
00070   if (progress) {
00071     progress->current_bagfile_name = bag_filename_;
00072     progress->current_bagfile_id = bag_id_;
00073     progress->total_messages = view_->size();
00074     progress->processed_messages = message_counter_;
00075     progress->total_seconds = duration_in_seconds_;
00076     progress->processed_seconds = processed_seconds;
00077   }
00078 
00079   return msg;
00080 }
00081 
00082 bool PlayableBag::IsMessageAvailable() const {
00083   return !buffered_messages_.empty() &&
00084          (buffered_messages_.front().getTime() <
00085           buffered_messages_.back().getTime() - buffer_delay_);
00086 }
00087 
00088 int PlayableBag::bag_id() const { return bag_id_; }
00089 
00090 void PlayableBag::AdvanceOneMessage() {
00091   CHECK(!finished_);
00092   if (view_iterator_ == view_->end()) {
00093     finished_ = true;
00094     return;
00095   }
00096   rosbag::MessageInstance& msg = *view_iterator_;
00097   if (!filtering_early_message_handler_ ||
00098       filtering_early_message_handler_(msg)) {
00099     buffered_messages_.push_back(msg);
00100   }
00101   ++view_iterator_;
00102   ++message_counter_;
00103 }
00104 
00105 void PlayableBag::AdvanceUntilMessageAvailable() {
00106   if (finished_) {
00107     return;
00108   }
00109   do {
00110     AdvanceOneMessage();
00111   } while (!finished_ && !IsMessageAvailable());
00112 }
00113 
00114 PlayableBagMultiplexer::PlayableBagMultiplexer() : pnh_("~") {
00115   bag_progress_pub_ = pnh_.advertise<cartographer_ros_msgs::BagfileProgress>(
00116       "bagfile_progress", 10);
00117   progress_pub_interval_ = pnh_.param("bagfile_progress_pub_interval", 10.0);
00118 }
00119 
00120 void PlayableBagMultiplexer::AddPlayableBag(PlayableBag playable_bag) {
00121   for (const auto& topic : playable_bag.topics()) {
00122     topics_.insert(topic);
00123   }
00124   playable_bags_.push_back(std::move(playable_bag));
00125   CHECK(playable_bags_.back().IsMessageAvailable());
00126   next_message_queue_.emplace(
00127       BagMessageItem{playable_bags_.back().PeekMessageTime(),
00128                      static_cast<int>(playable_bags_.size() - 1)});
00129   bag_progress_time_map_[playable_bag.bag_id()] = ros::Time::now();
00130 }
00131 
00132 bool PlayableBagMultiplexer::IsMessageAvailable() const {
00133   return !next_message_queue_.empty();
00134 }
00135 
00136 std::tuple<rosbag::MessageInstance, int, bool>
00137 PlayableBagMultiplexer::GetNextMessage() {
00138   CHECK(IsMessageAvailable());
00139   const int current_bag_index = next_message_queue_.top().bag_index;
00140   PlayableBag& current_bag = playable_bags_.at(current_bag_index);
00141   cartographer_ros_msgs::BagfileProgress progress;
00142   rosbag::MessageInstance msg = current_bag.GetNextMessage(&progress);
00143   const bool publish_progress =
00144       current_bag.finished() ||
00145       ros::Time::now() - bag_progress_time_map_[current_bag.bag_id()] >=
00146           ros::Duration(progress_pub_interval_);
00147   if (bag_progress_pub_.getNumSubscribers() > 0 && publish_progress) {
00148     progress.total_bagfiles = playable_bags_.size();
00149     if (current_bag.finished()) {
00150       progress.processed_seconds = current_bag.duration_in_seconds();
00151     }
00152     bag_progress_pub_.publish(progress);
00153     bag_progress_time_map_[current_bag.bag_id()] = ros::Time::now();
00154   }
00155   CHECK_EQ(msg.getTime(), next_message_queue_.top().message_timestamp);
00156   next_message_queue_.pop();
00157   if (current_bag.IsMessageAvailable()) {
00158     next_message_queue_.emplace(
00159         BagMessageItem{current_bag.PeekMessageTime(), current_bag_index});
00160   }
00161   return std::make_tuple(std::move(msg), current_bag.bag_id(),
00162                          !current_bag.IsMessageAvailable());
00163 }
00164 
00165 ros::Time PlayableBagMultiplexer::PeekMessageTime() const {
00166   CHECK(IsMessageAvailable());
00167   return next_message_queue_.top().message_timestamp;
00168 }
00169 
00170 }  // namespace cartographer_ros


cartographer_ros
Author(s): The Cartographer Authors
autogenerated on Wed Jul 10 2019 04:10:28