rosbag_launch.py
Go to the documentation of this file.
1 # Copyright 2023 Ekumen, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 from pathlib import Path
16 from typing import List
17 
18 from ament_index_python.packages import get_package_share_directory
19 
20 from launch import LaunchDescription
21 from launch.actions import DeclareLaunchArgument
22 from launch.actions import GroupAction
23 from launch.actions import ExecuteProcess
24 from launch.actions import Shutdown
25 from launch.utilities.type_utils import get_typed_value
26 
27 from launch_ros.actions import SetParameter
28 
29 from beluga_example.launch_utils import with_launch_arguments
30 
31 
33  example_dir_path = Path(get_package_share_directory('beluga_example'))
34  return [
35  DeclareLaunchArgument(
36  name='start_paused',
37  default_value='False',
38  description='Start the rosbag player in a paused state.',
39  ),
40  DeclareLaunchArgument(
41  name='playback_rate',
42  default_value='1.',
43  description='Rate used to playback the bag.',
44  ),
45  DeclareLaunchArgument(
46  name='rosbag_path',
47  default_value=str(example_dir_path / 'bags' / 'perfect_odometry'),
48  description='Path of the rosbag to playback.',
49  ),
50  DeclareLaunchArgument(
51  name='record_bag',
52  default_value='False',
53  description='Whether to record a bagfile or not.',
54  ),
55  DeclareLaunchArgument(
56  name='topics_to_record',
57  default_value='[/tf, /amcl_pose, /pose, /odometry/ground_truth]',
58  description='Topics to record in a new bagfile.',
59  ),
60  DeclareLaunchArgument(
61  name='bagfile_output',
62  default_value='',
63  description='Destination bagfile to create, defaults to timestamped folder',
64  ),
65  ]
66 
67 
70  start_paused,
71  rosbag_path,
72  playback_rate,
73  record_bag,
74  topics_to_record,
75  bagfile_output,
76 ):
77  start_paused = get_typed_value(start_paused, bool)
78  record_bag = get_typed_value(record_bag, bool)
79  topics_to_record = get_typed_value(topics_to_record, List[str])
80 
81  bag_play_cmd = [
82  'ros2',
83  'bag',
84  'play',
85  rosbag_path,
86  '--rate',
87  playback_rate,
88  '--clock',
89  ]
90 
91  if start_paused:
92  bag_play_cmd.append('--start-paused')
93 
94  other_nodes = []
95  if record_bag:
96  other_args = topics_to_record
97  if bagfile_output != '':
98  other_args.extend(('-o', bagfile_output))
99  other_nodes.append(
100  ExecuteProcess(
101  cmd=['ros2', 'bag', 'record', *other_args],
102  output='own_log',
103  )
104  )
105 
106  load_nodes = GroupAction(
107  actions=[
108  SetParameter('use_sim_time', True),
109  ExecuteProcess(
110  cmd=bag_play_cmd,
111  output='own_log',
112  on_exit=[Shutdown()],
113  ),
114  *other_nodes,
115  ]
116  )
117 
118  return LaunchDescription([load_nodes])
rosbag_launch.generate_launch_description
def generate_launch_description(start_paused, rosbag_path, playback_rate, record_bag, topics_to_record, bagfile_output)
Definition: rosbag_launch.py:69
beluga_example.launch_utils
Definition: launch_utils.py:1
beluga_example.launch_utils.with_launch_arguments
def with_launch_arguments(List[DeclareLaunchArgument] arguments_to_declare)
Definition: launch_utils.py:32
rosbag_launch.get_launch_arguments
def get_launch_arguments()
Definition: rosbag_launch.py:32


beluga_example
Author(s):
autogenerated on Tue Jul 16 2024 03:00:09