offline_node.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2018 The Cartographer Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <errno.h>
20 #include <string.h>
21 #include <sys/resource.h>
22 #include <time.h>
23 #include <chrono>
24 
25 #include "cartographer_ros/node.h"
29 #include "gflags/gflags.h"
30 #include "ros/callback_queue.h"
31 #include "rosgraph_msgs/Clock.h"
33 #include "urdf/model.h"
34 
35 DEFINE_string(configuration_directory, "",
36  "First directory in which configuration files are searched, "
37  "second is always the Cartographer installation to allow "
38  "including files from there.");
40  configuration_basenames, "",
41  "Comma-separated list of basenames, i.e. not containing any "
42  "directory prefix, of the configuration files for each trajectory. "
43  "The first configuration file will be used for node options. "
44  "If less configuration files are specified than trajectories, the "
45  "first file will be used the remaining trajectories.");
47  bag_filenames, "",
48  "Comma-separated list of bags to process. One bag per trajectory. "
49  "Any combination of simultaneous and sequential bags is supported.");
50 DEFINE_string(urdf_filenames, "",
51  "Comma-separated list of one or more URDF files that contain "
52  "static links for the sensor configuration(s).");
53 DEFINE_bool(use_bag_transforms, true,
54  "Whether to read, use and republish transforms from bags.");
55 DEFINE_string(load_state_filename, "",
56  "If non-empty, filename of a .pbstream file to load, containing "
57  "a saved SLAM state.");
58 DEFINE_bool(load_frozen_state, true,
59  "Load the saved state as frozen (non-optimized) trajectories.");
60 DEFINE_bool(keep_running, false,
61  "Keep running the offline node after all messages from the bag "
62  "have been processed.");
63 DEFINE_double(skip_seconds, 0,
64  "Optional amount of seconds to skip from the beginning "
65  "(i.e. when the earliest bag starts.). ");
66 
67 namespace cartographer_ros {
68 
69 constexpr char kClockTopic[] = "clock";
70 constexpr char kTfStaticTopic[] = "/tf_static";
71 constexpr char kTfTopic[] = "tf";
72 constexpr double kClockPublishFrequencySec = 1. / 30.;
73 constexpr int kSingleThreaded = 1;
74 // We publish tf messages one second earlier than other messages. Under
75 // the assumption of higher frequency tf this should ensure that tf can
76 // always interpolate.
77 const ::ros::Duration kDelay = ::ros::Duration(1.0);
78 
79 void RunOfflineNode(const MapBuilderFactory& map_builder_factory) {
80  CHECK(!FLAGS_configuration_directory.empty())
81  << "-configuration_directory is missing.";
82  CHECK(!FLAGS_configuration_basenames.empty())
83  << "-configuration_basenames is missing.";
84  CHECK(!(FLAGS_bag_filenames.empty() && FLAGS_load_state_filename.empty()))
85  << "-bag_filenames and -load_state_filename cannot both be unspecified.";
86  const auto bag_filenames =
87  cartographer_ros::SplitString(FLAGS_bag_filenames, ',');
88  cartographer_ros::NodeOptions node_options;
89  const auto configuration_basenames =
90  cartographer_ros::SplitString(FLAGS_configuration_basenames, ',');
91  std::vector<TrajectoryOptions> bag_trajectory_options(1);
92  std::tie(node_options, bag_trajectory_options.at(0)) =
93  LoadOptions(FLAGS_configuration_directory, configuration_basenames.at(0));
94 
95  for (size_t bag_index = 1; bag_index < bag_filenames.size(); ++bag_index) {
96  TrajectoryOptions current_trajectory_options;
97  if (bag_index < configuration_basenames.size()) {
98  std::tie(std::ignore, current_trajectory_options) = LoadOptions(
99  FLAGS_configuration_directory, configuration_basenames.at(bag_index));
100  } else {
101  current_trajectory_options = bag_trajectory_options.at(0);
102  }
103  bag_trajectory_options.push_back(current_trajectory_options);
104  }
105  if (bag_filenames.size() > 0) {
106  CHECK_EQ(bag_trajectory_options.size(), bag_filenames.size());
107  }
108 
109  // Since we preload the transform buffer, we should never have to wait for a
110  // transform. When we finish processing the bag, we will simply drop any
111  // remaining sensor data that cannot be transformed due to missing transforms.
112  node_options.lookup_transform_timeout_sec = 0.;
113 
114  auto map_builder = map_builder_factory(node_options.map_builder_options);
115 
116  const std::chrono::time_point<std::chrono::steady_clock> start_time =
117  std::chrono::steady_clock::now();
118 
120 
121  std::vector<geometry_msgs::TransformStamped> urdf_transforms;
122  for (const std::string& urdf_filename :
123  cartographer_ros::SplitString(FLAGS_urdf_filenames, ',')) {
124  const auto current_urdf_transforms =
125  ReadStaticTransformsFromUrdf(urdf_filename, &tf_buffer);
126  urdf_transforms.insert(urdf_transforms.end(),
127  current_urdf_transforms.begin(),
128  current_urdf_transforms.end());
129  }
130 
131  tf_buffer.setUsingDedicatedThread(true);
132 
133  Node node(node_options, std::move(map_builder), &tf_buffer);
134  if (!FLAGS_load_state_filename.empty()) {
135  node.LoadState(FLAGS_load_state_filename, FLAGS_load_frozen_state);
136  }
137 
138  ::ros::Publisher tf_publisher =
139  node.node_handle()->advertise<tf2_msgs::TFMessage>(
141 
142  ::tf2_ros::StaticTransformBroadcaster static_tf_broadcaster;
143 
144  ::ros::Publisher clock_publisher =
145  node.node_handle()->advertise<rosgraph_msgs::Clock>(
147 
148  if (urdf_transforms.size() > 0) {
149  static_tf_broadcaster.sendTransform(urdf_transforms);
150  }
151 
152  ros::AsyncSpinner async_spinner(kSingleThreaded);
153  async_spinner.start();
154  rosgraph_msgs::Clock clock;
155  auto clock_republish_timer = node.node_handle()->createWallTimer(
156  ::ros::WallDuration(kClockPublishFrequencySec),
157  [&clock_publisher, &clock](const ::ros::WallTimerEvent&) {
158  clock_publisher.publish(clock);
159  },
160  false /* oneshot */, false /* autostart */);
161 
162  std::vector<
163  std::set<cartographer::mapping::TrajectoryBuilderInterface::SensorId>>
164  bag_expected_sensor_ids;
165  if (configuration_basenames.size() == 1) {
166  const auto current_bag_expected_sensor_ids =
168  {bag_trajectory_options.front()});
169  bag_expected_sensor_ids = {bag_filenames.size(),
170  current_bag_expected_sensor_ids.front()};
171  } else {
172  bag_expected_sensor_ids =
173  node.ComputeDefaultSensorIdsForMultipleBags(bag_trajectory_options);
174  }
175  CHECK_EQ(bag_expected_sensor_ids.size(), bag_filenames.size());
176 
177  std::map<std::pair<int /* bag_index */, std::string>,
179  bag_topic_to_sensor_id;
180  PlayableBagMultiplexer playable_bag_multiplexer;
181  for (size_t current_bag_index = 0; current_bag_index < bag_filenames.size();
182  ++current_bag_index) {
183  const std::string& bag_filename = bag_filenames.at(current_bag_index);
184  if (!::ros::ok()) {
185  return;
186  }
187  for (const auto& expected_sensor_id :
188  bag_expected_sensor_ids.at(current_bag_index)) {
189  const auto bag_resolved_topic = std::make_pair(
190  static_cast<int>(current_bag_index),
191  node.node_handle()->resolveName(expected_sensor_id.id));
192  if (bag_topic_to_sensor_id.count(bag_resolved_topic) != 0) {
193  LOG(ERROR) << "Sensor " << expected_sensor_id.id << " of bag "
194  << current_bag_index << " resolves to topic "
195  << bag_resolved_topic.second << " which is already used by "
196  << " sensor "
197  << bag_topic_to_sensor_id.at(bag_resolved_topic).id;
198  }
199  bag_topic_to_sensor_id[bag_resolved_topic] = expected_sensor_id;
200  }
201 
202  playable_bag_multiplexer.AddPlayableBag(PlayableBag(
203  bag_filename, current_bag_index, ros::TIME_MIN, ros::TIME_MAX, kDelay,
204  // PlayableBag::FilteringEarlyMessageHandler is used to get an early
205  // peek at the tf messages in the bag and insert them into 'tf_buffer'.
206  // When a message is retrieved by GetNextMessage() further below,
207  // we will have already inserted further 'kDelay' seconds worth of
208  // transforms into 'tf_buffer' via this lambda.
209  [&tf_publisher, &tf_buffer](const rosbag::MessageInstance& msg) {
210  if (msg.isType<tf2_msgs::TFMessage>()) {
211  if (FLAGS_use_bag_transforms) {
212  const auto tf_message = msg.instantiate<tf2_msgs::TFMessage>();
213  tf_publisher.publish(tf_message);
214 
215  for (const auto& transform : tf_message->transforms) {
216  try {
217  // We need to keep 'tf_buffer' small because it becomes very
218  // inefficient otherwise. We make sure that tf_messages are
219  // published before any data messages, so that tf lookups
220  // always work.
221  tf_buffer.setTransform(transform, "unused_authority",
222  msg.getTopic() == kTfStaticTopic);
223  } catch (const tf2::TransformException& ex) {
224  LOG(WARNING) << ex.what();
225  }
226  }
227  }
228  // Tell 'PlayableBag' to filter the tf message since there is no
229  // further use for it.
230  return false;
231  } else {
232  return true;
233  }
234  }));
235  }
236 
237  // TODO(gaschler): Warn if resolved topics are not in bags.
238  std::unordered_map<int, int> bag_index_to_trajectory_id;
239  const ros::Time begin_time =
240  // If no bags were loaded, we cannot peek the time of first message.
241  playable_bag_multiplexer.IsMessageAvailable()
242  ? playable_bag_multiplexer.PeekMessageTime()
243  : ros::Time();
244  while (playable_bag_multiplexer.IsMessageAvailable()) {
245  if (!::ros::ok()) {
246  return;
247  }
248 
249  const auto next_msg_tuple = playable_bag_multiplexer.GetNextMessage();
250  const rosbag::MessageInstance& msg = std::get<0>(next_msg_tuple);
251  const int bag_index = std::get<1>(next_msg_tuple);
252  const bool is_last_message_in_bag = std::get<2>(next_msg_tuple);
253 
254  if (msg.getTime() < (begin_time + ros::Duration(FLAGS_skip_seconds))) {
255  continue;
256  }
257 
258  int trajectory_id;
259  // Lazily add trajectories only when the first message arrives in order
260  // to avoid blocking the sensor queue.
261  if (bag_index_to_trajectory_id.count(bag_index) == 0) {
262  trajectory_id =
263  node.AddOfflineTrajectory(bag_expected_sensor_ids.at(bag_index),
264  bag_trajectory_options.at(bag_index));
265  CHECK(bag_index_to_trajectory_id
266  .emplace(std::piecewise_construct,
267  std::forward_as_tuple(bag_index),
268  std::forward_as_tuple(trajectory_id))
269  .second);
270  LOG(INFO) << "Assigned trajectory " << trajectory_id << " to bag "
271  << bag_filenames.at(bag_index);
272  } else {
273  trajectory_id = bag_index_to_trajectory_id.at(bag_index);
274  }
275 
276  const auto bag_topic = std::make_pair(
277  bag_index,
278  node.node_handle()->resolveName(msg.getTopic(), false /* resolve */));
279  auto it = bag_topic_to_sensor_id.find(bag_topic);
280  if (it != bag_topic_to_sensor_id.end()) {
281  const std::string& sensor_id = it->second.id;
282  if (msg.isType<sensor_msgs::LaserScan>()) {
283  node.HandleLaserScanMessage(trajectory_id, sensor_id,
284  msg.instantiate<sensor_msgs::LaserScan>());
285  }
286  if (msg.isType<sensor_msgs::MultiEchoLaserScan>()) {
287  node.HandleMultiEchoLaserScanMessage(
288  trajectory_id, sensor_id,
289  msg.instantiate<sensor_msgs::MultiEchoLaserScan>());
290  }
291  if (msg.isType<sensor_msgs::PointCloud2>()) {
292  node.HandlePointCloud2Message(
293  trajectory_id, sensor_id,
294  msg.instantiate<sensor_msgs::PointCloud2>());
295  }
296  if (msg.isType<sensor_msgs::Imu>()) {
297  node.HandleImuMessage(trajectory_id, sensor_id,
298  msg.instantiate<sensor_msgs::Imu>());
299  }
300  if (msg.isType<nav_msgs::Odometry>()) {
301  node.HandleOdometryMessage(trajectory_id, sensor_id,
302  msg.instantiate<nav_msgs::Odometry>());
303  }
304  if (msg.isType<sensor_msgs::NavSatFix>()) {
305  node.HandleNavSatFixMessage(trajectory_id, sensor_id,
306  msg.instantiate<sensor_msgs::NavSatFix>());
307  }
308  if (msg.isType<cartographer_ros_msgs::LandmarkList>()) {
309  node.HandleLandmarkMessage(
310  trajectory_id, sensor_id,
311  msg.instantiate<cartographer_ros_msgs::LandmarkList>());
312  }
313  }
314  clock.clock = msg.getTime();
315  clock_publisher.publish(clock);
316 
317  if (is_last_message_in_bag) {
318  node.FinishTrajectory(trajectory_id);
319  }
320  }
321 
322  // Ensure the clock is republished after the bag has been finished, during the
323  // final optimization, serialization, and optional indefinite spinning at the
324  // end.
325  clock_republish_timer.start();
326  node.RunFinalOptimization();
327 
328  const std::chrono::time_point<std::chrono::steady_clock> end_time =
329  std::chrono::steady_clock::now();
330  const double wall_clock_seconds =
331  std::chrono::duration_cast<std::chrono::duration<double>>(end_time -
332  start_time)
333  .count();
334 
335  LOG(INFO) << "Elapsed wall clock time: " << wall_clock_seconds << " s";
336 #ifdef __linux__
337  timespec cpu_timespec = {};
338  clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &cpu_timespec);
339  LOG(INFO) << "Elapsed CPU time: "
340  << (cpu_timespec.tv_sec + 1e-9 * cpu_timespec.tv_nsec) << " s";
341  rusage usage;
342  CHECK_EQ(getrusage(RUSAGE_SELF, &usage), 0) << strerror(errno);
343  LOG(INFO) << "Peak memory usage: " << usage.ru_maxrss << " KiB";
344 #endif
345 
346  if (::ros::ok() && bag_filenames.size() > 0) {
347  const std::string output_filename = bag_filenames.front();
348  const std::string suffix = ".pbstream";
349  const std::string state_output_filename = output_filename + suffix;
350  LOG(INFO) << "Writing state to '" << state_output_filename << "'...";
351  node.SerializeState(state_output_filename);
352  }
353  if (FLAGS_keep_running) {
354  ::ros::waitForShutdown();
355  }
356 }
357 
358 } // namespace cartographer_ros
DEFINE_double(skip_seconds, 0, "Optional amount of seconds to skip from the beginning " "(i.e. when the earliest bag starts.). ")
constexpr char kClockTopic[]
Definition: offline_node.cc:69
void RunOfflineNode(const MapBuilderFactory &map_builder_factory)
Definition: offline_node.cc:79
std::function< std::unique_ptr<::cartographer::mapping::MapBuilderInterface >(const ::cartographer::mapping::proto::MapBuilderOptions &)> MapBuilderFactory
Definition: offline_node.h:32
const ::ros::Duration kDelay
Definition: offline_node.cc:77
::cartographer::mapping::proto::MapBuilderOptions map_builder_options
Definition: node_options.h:32
void setUsingDedicatedThread(bool value)
constexpr char kTfTopic[]
Definition: offline_node.cc:71
constexpr double kClockPublishFrequencySec
Definition: offline_node.cc:72
std::vector< std::set<::cartographer::mapping::TrajectoryBuilderInterface::SensorId > > ComputeDefaultSensorIdsForMultipleBags(const std::vector< TrajectoryOptions > &bags_options) const
Definition: node.cc:525
ros::Time const & getTime() const
const Time TIME_MIN(0, 1)
constexpr int kLatestOnlyPublisherQueueSize
constexpr int kSingleThreaded
Definition: offline_node.cc:73
DEFINE_string(configuration_directory, "", "First directory in which configuration files are searched, " "second is always the Cartographer installation to allow " "including files from there.")
constexpr char kTfStaticTopic[]
Definition: offline_node.cc:70
WallTimer createWallTimer(WallDuration period, void(T::*callback)(const WallTimerEvent &), T *obj, bool oneshot=false, bool autostart=true) const
std::tuple< NodeOptions, TrajectoryOptions > LoadOptions(const std::string &configuration_directory, const std::string &configuration_basename)
Definition: node_options.cc:46
void publish(const boost::shared_ptr< M > &message) const
void AddPlayableBag(PlayableBag playable_bag)
std::vector< std::string > SplitString(const std::string &input, const char delimiter)
Definition: split_string.cc:23
int count
ROSCPP_DECL bool ok()
std::string resolveName(const std::string &name, bool remap=true) const
ROSTIME_DECL const Time TIME_MAX
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
void LoadState(const std::string &state_filename, bool load_frozen_state)
Definition: node.cc:699
boost::shared_ptr< T > instantiate() const
::ros::NodeHandle * node_handle()
Definition: node.cc:136
def usage(progname)
DEFINE_bool(use_bag_transforms, true, "Whether to read, use and republish transforms from bags.")
std::string const & getTopic() const
tf2_ros::Buffer * tf_buffer
std::vector< geometry_msgs::TransformStamped > ReadStaticTransformsFromUrdf(const std::string &urdf_filename, tf2_ros::Buffer *const tf_buffer)
Definition: urdf_reader.cc:27


cartographer_ros
Author(s): The Cartographer Authors
autogenerated on Mon Feb 28 2022 22:06:05