offline_node.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright 2018 The Cartographer Authors
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *      http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00017 #include "cartographer_ros/offline_node.h"
00018 
00019 #include <errno.h>
00020 #include <string.h>
00021 #ifndef WIN32
00022 #include <sys/resource.h>
00023 #endif
00024 #include <time.h>
00025 
00026 #include <chrono>
00027 
00028 #include "absl/strings/str_split.h"
00029 #include "cartographer_ros/node.h"
00030 #include "cartographer_ros/playable_bag.h"
00031 #include "cartographer_ros/urdf_reader.h"
00032 #include "gflags/gflags.h"
00033 #include "ros/callback_queue.h"
00034 #include "rosgraph_msgs/Clock.h"
00035 #include "tf2_ros/static_transform_broadcaster.h"
00036 #include "urdf/model.h"
00037 
00038 DEFINE_bool(collect_metrics, false,
00039             "Activates the collection of runtime metrics. If activated, the "
00040             "metrics can be accessed via a ROS service.");
00041 DEFINE_string(configuration_directory, "",
00042               "First directory in which configuration files are searched, "
00043               "second is always the Cartographer installation to allow "
00044               "including files from there.");
00045 DEFINE_string(
00046     configuration_basenames, "",
00047     "Comma-separated list of basenames, i.e. not containing any "
00048     "directory prefix, of the configuration files for each trajectory. "
00049     "The first configuration file will be used for node options. "
00050     "If less configuration files are specified than trajectories, the "
00051     "first file will be used the remaining trajectories.");
00052 DEFINE_string(
00053     bag_filenames, "",
00054     "Comma-separated list of bags to process. One bag per trajectory. "
00055     "Any combination of simultaneous and sequential bags is supported.");
00056 DEFINE_string(urdf_filenames, "",
00057               "Comma-separated list of one or more URDF files that contain "
00058               "static links for the sensor configuration(s).");
00059 DEFINE_bool(use_bag_transforms, true,
00060             "Whether to read, use and republish transforms from bags.");
00061 DEFINE_string(load_state_filename, "",
00062               "If non-empty, filename of a .pbstream file to load, containing "
00063               "a saved SLAM state.");
00064 DEFINE_bool(load_frozen_state, true,
00065             "Load the saved state as frozen (non-optimized) trajectories.");
00066 DEFINE_string(save_state_filename, "",
00067               "Explicit name of the file to which the serialized state will be "
00068               "written before shutdown. If left empty, the filename will be "
00069               "inferred from the first bagfile's name as: "
00070               "<bag_filenames[0]>.pbstream");
00071 DEFINE_bool(keep_running, false,
00072             "Keep running the offline node after all messages from the bag "
00073             "have been processed.");
00074 DEFINE_double(skip_seconds, 0,
00075               "Optional amount of seconds to skip from the beginning "
00076               "(i.e. when the earliest bag starts.). ");
00077 
00078 namespace cartographer_ros {
00079 
00080 constexpr char kClockTopic[] = "clock";
00081 constexpr char kTfStaticTopic[] = "/tf_static";
00082 constexpr char kTfTopic[] = "tf";
00083 constexpr double kClockPublishFrequencySec = 1. / 30.;
00084 constexpr int kSingleThreaded = 1;
00085 // We publish tf messages one second earlier than other messages. Under
00086 // the assumption of higher frequency tf this should ensure that tf can
00087 // always interpolate.
00088 const ::ros::Duration kDelay = ::ros::Duration(1.0);
00089 
00090 void RunOfflineNode(const MapBuilderFactory& map_builder_factory) {
00091   CHECK(!FLAGS_configuration_directory.empty())
00092       << "-configuration_directory is missing.";
00093   CHECK(!FLAGS_configuration_basenames.empty())
00094       << "-configuration_basenames is missing.";
00095   CHECK(!(FLAGS_bag_filenames.empty() && FLAGS_load_state_filename.empty()))
00096       << "-bag_filenames and -load_state_filename cannot both be unspecified.";
00097   const std::vector<std::string> bag_filenames =
00098       absl::StrSplit(FLAGS_bag_filenames, ',', absl::SkipEmpty());
00099   cartographer_ros::NodeOptions node_options;
00100   const std::vector<std::string> configuration_basenames =
00101       absl::StrSplit(FLAGS_configuration_basenames, ',', absl::SkipEmpty());
00102   std::vector<TrajectoryOptions> bag_trajectory_options(1);
00103   std::tie(node_options, bag_trajectory_options.at(0)) =
00104       LoadOptions(FLAGS_configuration_directory, configuration_basenames.at(0));
00105 
00106   for (size_t bag_index = 1; bag_index < bag_filenames.size(); ++bag_index) {
00107     TrajectoryOptions current_trajectory_options;
00108     if (bag_index < configuration_basenames.size()) {
00109       std::tie(std::ignore, current_trajectory_options) = LoadOptions(
00110           FLAGS_configuration_directory, configuration_basenames.at(bag_index));
00111     } else {
00112       current_trajectory_options = bag_trajectory_options.at(0);
00113     }
00114     bag_trajectory_options.push_back(current_trajectory_options);
00115   }
00116   if (bag_filenames.size() > 0) {
00117     CHECK_EQ(bag_trajectory_options.size(), bag_filenames.size());
00118   }
00119 
00120   // Since we preload the transform buffer, we should never have to wait for a
00121   // transform. When we finish processing the bag, we will simply drop any
00122   // remaining sensor data that cannot be transformed due to missing transforms.
00123   node_options.lookup_transform_timeout_sec = 0.;
00124 
00125   auto map_builder = map_builder_factory(node_options.map_builder_options);
00126 
00127   const std::chrono::time_point<std::chrono::steady_clock> start_time =
00128       std::chrono::steady_clock::now();
00129 
00130   tf2_ros::Buffer tf_buffer;
00131 
00132   std::vector<geometry_msgs::TransformStamped> urdf_transforms;
00133   const std::vector<std::string> urdf_filenames =
00134       absl::StrSplit(FLAGS_urdf_filenames, ',', absl::SkipEmpty());
00135   for (const auto& urdf_filename : urdf_filenames) {
00136     const auto current_urdf_transforms =
00137         ReadStaticTransformsFromUrdf(urdf_filename, &tf_buffer);
00138     urdf_transforms.insert(urdf_transforms.end(),
00139                            current_urdf_transforms.begin(),
00140                            current_urdf_transforms.end());
00141   }
00142 
00143   tf_buffer.setUsingDedicatedThread(true);
00144 
00145   Node node(node_options, std::move(map_builder), &tf_buffer,
00146             FLAGS_collect_metrics);
00147   if (!FLAGS_load_state_filename.empty()) {
00148     node.LoadState(FLAGS_load_state_filename, FLAGS_load_frozen_state);
00149   }
00150 
00151   ::ros::Publisher tf_publisher =
00152       node.node_handle()->advertise<tf2_msgs::TFMessage>(
00153           kTfTopic, kLatestOnlyPublisherQueueSize);
00154 
00155   ::tf2_ros::StaticTransformBroadcaster static_tf_broadcaster;
00156 
00157   ::ros::Publisher clock_publisher =
00158       node.node_handle()->advertise<rosgraph_msgs::Clock>(
00159           kClockTopic, kLatestOnlyPublisherQueueSize);
00160 
00161   if (urdf_transforms.size() > 0) {
00162     static_tf_broadcaster.sendTransform(urdf_transforms);
00163   }
00164 
00165   ros::AsyncSpinner async_spinner(kSingleThreaded);
00166   async_spinner.start();
00167   rosgraph_msgs::Clock clock;
00168   auto clock_republish_timer = node.node_handle()->createWallTimer(
00169       ::ros::WallDuration(kClockPublishFrequencySec),
00170       [&clock_publisher, &clock](const ::ros::WallTimerEvent&) {
00171         clock_publisher.publish(clock);
00172       },
00173       false /* oneshot */, false /* autostart */);
00174 
00175   std::vector<
00176       std::set<cartographer::mapping::TrajectoryBuilderInterface::SensorId>>
00177       bag_expected_sensor_ids;
00178   if (configuration_basenames.size() == 1) {
00179     const auto current_bag_expected_sensor_ids =
00180         node.ComputeDefaultSensorIdsForMultipleBags(
00181             {bag_trajectory_options.front()});
00182     bag_expected_sensor_ids = {bag_filenames.size(),
00183                                current_bag_expected_sensor_ids.front()};
00184   } else {
00185     bag_expected_sensor_ids =
00186         node.ComputeDefaultSensorIdsForMultipleBags(bag_trajectory_options);
00187   }
00188   CHECK_EQ(bag_expected_sensor_ids.size(), bag_filenames.size());
00189 
00190   std::map<std::pair<int /* bag_index */, std::string>,
00191            cartographer::mapping::TrajectoryBuilderInterface::SensorId>
00192       bag_topic_to_sensor_id;
00193   PlayableBagMultiplexer playable_bag_multiplexer;
00194   for (size_t current_bag_index = 0; current_bag_index < bag_filenames.size();
00195        ++current_bag_index) {
00196     const std::string& bag_filename = bag_filenames.at(current_bag_index);
00197     if (!::ros::ok()) {
00198       return;
00199     }
00200     for (const auto& expected_sensor_id :
00201          bag_expected_sensor_ids.at(current_bag_index)) {
00202       const auto bag_resolved_topic = std::make_pair(
00203           static_cast<int>(current_bag_index),
00204           node.node_handle()->resolveName(expected_sensor_id.id));
00205       if (bag_topic_to_sensor_id.count(bag_resolved_topic) != 0) {
00206         LOG(ERROR) << "Sensor " << expected_sensor_id.id << " of bag "
00207                    << current_bag_index << " resolves to topic "
00208                    << bag_resolved_topic.second << " which is already used by "
00209                    << " sensor "
00210                    << bag_topic_to_sensor_id.at(bag_resolved_topic).id;
00211       }
00212       bag_topic_to_sensor_id[bag_resolved_topic] = expected_sensor_id;
00213     }
00214 
00215     playable_bag_multiplexer.AddPlayableBag(PlayableBag(
00216         bag_filename, current_bag_index, ros::TIME_MIN, ros::TIME_MAX, kDelay,
00217         // PlayableBag::FilteringEarlyMessageHandler is used to get an early
00218         // peek at the tf messages in the bag and insert them into 'tf_buffer'.
00219         // When a message is retrieved by GetNextMessage() further below,
00220         // we will have already inserted further 'kDelay' seconds worth of
00221         // transforms into 'tf_buffer' via this lambda.
00222         [&tf_publisher, &tf_buffer](const rosbag::MessageInstance& msg) {
00223           if (msg.isType<tf2_msgs::TFMessage>()) {
00224             if (FLAGS_use_bag_transforms) {
00225               const auto tf_message = msg.instantiate<tf2_msgs::TFMessage>();
00226               tf_publisher.publish(tf_message);
00227 
00228               for (const auto& transform : tf_message->transforms) {
00229                 try {
00230                   // We need to keep 'tf_buffer' small because it becomes very
00231                   // inefficient otherwise. We make sure that tf_messages are
00232                   // published before any data messages, so that tf lookups
00233                   // always work.
00234                   tf_buffer.setTransform(transform, "unused_authority",
00235                                          msg.getTopic() == kTfStaticTopic);
00236                 } catch (const tf2::TransformException& ex) {
00237                   LOG(WARNING) << ex.what();
00238                 }
00239               }
00240             }
00241             // Tell 'PlayableBag' to filter the tf message since there is no
00242             // further use for it.
00243             return false;
00244           } else {
00245             return true;
00246           }
00247         }));
00248   }
00249 
00250   std::set<std::string> bag_topics;
00251   std::stringstream bag_topics_string;
00252   for (const auto& topic : playable_bag_multiplexer.topics()) {
00253     std::string resolved_topic = node.node_handle()->resolveName(topic, false);
00254     bag_topics.insert(resolved_topic);
00255     bag_topics_string << resolved_topic << ",";
00256   }
00257   bool print_topics = false;
00258   for (const auto& entry : bag_topic_to_sensor_id) {
00259     const std::string& resolved_topic = entry.first.second;
00260     if (bag_topics.count(resolved_topic) == 0) {
00261       LOG(WARNING) << "Expected resolved topic \"" << resolved_topic
00262                    << "\" not found in bag file(s).";
00263       print_topics = true;
00264     }
00265   }
00266   if (print_topics) {
00267     LOG(WARNING) << "Available topics in bag file(s) are "
00268                  << bag_topics_string.str();
00269   }
00270 
00271   std::unordered_map<int, int> bag_index_to_trajectory_id;
00272   const ros::Time begin_time =
00273       // If no bags were loaded, we cannot peek the time of first message.
00274       playable_bag_multiplexer.IsMessageAvailable()
00275           ? playable_bag_multiplexer.PeekMessageTime()
00276           : ros::Time();
00277   while (playable_bag_multiplexer.IsMessageAvailable()) {
00278     if (!::ros::ok()) {
00279       return;
00280     }
00281 
00282     const auto next_msg_tuple = playable_bag_multiplexer.GetNextMessage();
00283     const rosbag::MessageInstance& msg = std::get<0>(next_msg_tuple);
00284     const int bag_index = std::get<1>(next_msg_tuple);
00285     const bool is_last_message_in_bag = std::get<2>(next_msg_tuple);
00286 
00287     if (msg.getTime() < (begin_time + ros::Duration(FLAGS_skip_seconds))) {
00288       continue;
00289     }
00290 
00291     int trajectory_id;
00292     // Lazily add trajectories only when the first message arrives in order
00293     // to avoid blocking the sensor queue.
00294     if (bag_index_to_trajectory_id.count(bag_index) == 0) {
00295       trajectory_id =
00296           node.AddOfflineTrajectory(bag_expected_sensor_ids.at(bag_index),
00297                                     bag_trajectory_options.at(bag_index));
00298       CHECK(bag_index_to_trajectory_id
00299                 .emplace(std::piecewise_construct,
00300                          std::forward_as_tuple(bag_index),
00301                          std::forward_as_tuple(trajectory_id))
00302                 .second);
00303       LOG(INFO) << "Assigned trajectory " << trajectory_id << " to bag "
00304                 << bag_filenames.at(bag_index);
00305     } else {
00306       trajectory_id = bag_index_to_trajectory_id.at(bag_index);
00307     }
00308 
00309     const auto bag_topic = std::make_pair(
00310         bag_index,
00311         node.node_handle()->resolveName(msg.getTopic(), false /* resolve */));
00312     auto it = bag_topic_to_sensor_id.find(bag_topic);
00313     if (it != bag_topic_to_sensor_id.end()) {
00314       const std::string& sensor_id = it->second.id;
00315       if (msg.isType<sensor_msgs::LaserScan>()) {
00316         node.HandleLaserScanMessage(trajectory_id, sensor_id,
00317                                     msg.instantiate<sensor_msgs::LaserScan>());
00318       }
00319       if (msg.isType<sensor_msgs::MultiEchoLaserScan>()) {
00320         node.HandleMultiEchoLaserScanMessage(
00321             trajectory_id, sensor_id,
00322             msg.instantiate<sensor_msgs::MultiEchoLaserScan>());
00323       }
00324       if (msg.isType<sensor_msgs::PointCloud2>()) {
00325         node.HandlePointCloud2Message(
00326             trajectory_id, sensor_id,
00327             msg.instantiate<sensor_msgs::PointCloud2>());
00328       }
00329       if (msg.isType<sensor_msgs::Imu>()) {
00330         node.HandleImuMessage(trajectory_id, sensor_id,
00331                               msg.instantiate<sensor_msgs::Imu>());
00332       }
00333       if (msg.isType<nav_msgs::Odometry>()) {
00334         node.HandleOdometryMessage(trajectory_id, sensor_id,
00335                                    msg.instantiate<nav_msgs::Odometry>());
00336       }
00337       if (msg.isType<sensor_msgs::NavSatFix>()) {
00338         node.HandleNavSatFixMessage(trajectory_id, sensor_id,
00339                                     msg.instantiate<sensor_msgs::NavSatFix>());
00340       }
00341       if (msg.isType<cartographer_ros_msgs::LandmarkList>()) {
00342         node.HandleLandmarkMessage(
00343             trajectory_id, sensor_id,
00344             msg.instantiate<cartographer_ros_msgs::LandmarkList>());
00345       }
00346     }
00347     clock.clock = msg.getTime();
00348     clock_publisher.publish(clock);
00349 
00350     if (is_last_message_in_bag) {
00351       node.FinishTrajectory(trajectory_id);
00352     }
00353   }
00354 
00355   // Ensure the clock is republished after the bag has been finished, during the
00356   // final optimization, serialization, and optional indefinite spinning at the
00357   // end.
00358   clock_republish_timer.start();
00359   node.RunFinalOptimization();
00360 
00361   const std::chrono::time_point<std::chrono::steady_clock> end_time =
00362       std::chrono::steady_clock::now();
00363   const double wall_clock_seconds =
00364       std::chrono::duration_cast<std::chrono::duration<double>>(end_time -
00365                                                                 start_time)
00366           .count();
00367 
00368   LOG(INFO) << "Elapsed wall clock time: " << wall_clock_seconds << " s";
00369 #ifdef __linux__
00370   timespec cpu_timespec = {};
00371   clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &cpu_timespec);
00372   LOG(INFO) << "Elapsed CPU time: "
00373             << (cpu_timespec.tv_sec + 1e-9 * cpu_timespec.tv_nsec) << " s";
00374   rusage usage;
00375   CHECK_EQ(getrusage(RUSAGE_SELF, &usage), 0) << strerror(errno);
00376   LOG(INFO) << "Peak memory usage: " << usage.ru_maxrss << " KiB";
00377 #endif
00378 
00379   // Serialize unless we have neither a bagfile nor an explicit state filename.
00380   if (::ros::ok() &&
00381       !(bag_filenames.empty() && FLAGS_save_state_filename.empty())) {
00382     const std::string state_output_filename =
00383         FLAGS_save_state_filename.empty()
00384             ? absl::StrCat(bag_filenames.front(), ".pbstream")
00385             : FLAGS_save_state_filename;
00386     LOG(INFO) << "Writing state to '" << state_output_filename << "'...";
00387     node.SerializeState(state_output_filename,
00388                         true /* include_unfinished_submaps */);
00389   }
00390   if (FLAGS_keep_running) {
00391     LOG(INFO) << "Finished processing and waiting for shutdown.";
00392     ::ros::waitForShutdown();
00393   }
00394 }
00395 
00396 }  // namespace cartographer_ros


cartographer_ros
Author(s): The Cartographer Authors
autogenerated on Wed Jul 10 2019 04:10:28