Source code for ros_gz_sim.actions.gzserver

# Copyright 2024 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module for the GzServer action."""

import os
from typing import Dict, List, Optional, Union

from ament_index_python.packages import get_package_share_directory
from catkin_pkg.package import InvalidPackage, PACKAGE_MANIFEST_FILENAME, parse_package
from launch.action import Action
from launch.actions import SetEnvironmentVariable
from launch.frontend import Entity, expose_action, Parser
from launch.launch_context import LaunchContext
from launch.some_substitutions_type import SomeSubstitutionsType
from launch.substitutions import TextSubstitution
from launch.utilities.type_utils import normalize_typed_substitution, perform_typed_substitution
from launch_ros.actions import ComposableNodeContainer, LoadComposableNodes, Node
from launch_ros.descriptions import ComposableNode
from ros2pkg.api import get_package_names


"""
Search for model, plugin and media paths exported by packages.

e.g.  <export>
          <gazebo_ros gazebo_model_path="${prefix}/../"/>
          <gazebo_ros gazebo_media_path="${prefix}/../"/>
      </export>
${prefix} is replaced by package's share directory in install.

Thus the required directory needs to be installed from CMakeLists.txt
e.g.  install(DIRECTORY models
          DESTINATION share/${PROJECT_NAME})
"""


[docs] class GazeboRosPaths:
[docs] @staticmethod def get_paths(): gazebo_model_path = [] gazebo_plugin_path = [] gazebo_media_path = [] for package_name in get_package_names(): package_share_path = get_package_share_directory(package_name) package_file_path = os.path.join(package_share_path, PACKAGE_MANIFEST_FILENAME) if os.path.isfile(package_file_path): try: package = parse_package(package_file_path) except InvalidPackage: continue for export in package.exports: if export.tagname == 'gazebo_ros': if 'gazebo_model_path' in export.attributes: xml_path = export.attributes['gazebo_model_path'] xml_path = xml_path.replace('${prefix}', package_share_path) gazebo_model_path.append(xml_path) if 'plugin_path' in export.attributes: xml_path = export.attributes['plugin_path'] xml_path = xml_path.replace('${prefix}', package_share_path) gazebo_plugin_path.append(xml_path) if 'gazebo_media_path' in export.attributes: xml_path = export.attributes['gazebo_media_path'] xml_path = xml_path.replace('${prefix}', package_share_path) gazebo_media_path.append(xml_path) gazebo_model_path = os.pathsep.join(gazebo_model_path + gazebo_media_path) gazebo_plugin_path = os.pathsep.join(gazebo_plugin_path) return gazebo_model_path, gazebo_plugin_path
[docs] @expose_action('gz_server') class GzServer(Action): """Action that executes a gz_server ROS [composable] node.""" def __init__( self, *, world_sdf_file: SomeSubstitutionsType = '', world_sdf_string: SomeSubstitutionsType = '', container_name: SomeSubstitutionsType = 'ros_gz_container', create_own_container: Union[bool, SomeSubstitutionsType] = False, use_composition: Union[bool, SomeSubstitutionsType] = False, initial_sim_time: Union[float, SomeSubstitutionsType] = 0.0, verbosity_level: Union[int, SomeSubstitutionsType] = 4, **kwargs ) -> None: """ Construct a gz_server action. All arguments are forwarded to `ros_gz_sim.launch.gz_server.launch.py`, so see the documentation of that class for further details. :param: world_sdf_file Path to the SDF world file. :param: world_sdf_string SDF world string. :param: container_name Name of container that nodes will load in if use composition. :param: create_own_container Whether to start a ROS container when using composition. :param: use_composition Use composed bringup if True. :param: initial_sim_time: The initial simulation time. :param: verbosity_level: The verbosity level of the Gazebo server (0=FATAL, 4=DEBUG). """ super().__init__(**kwargs) self.__world_sdf_file = world_sdf_file self.__world_sdf_string = world_sdf_string self.__container_name = container_name # This is here to allow using strings or booleans as values for boolean variables when # the Action is used from Python See the RosGzBridge.__init__ function for more details. if isinstance(create_own_container, str): self.__create_own_container = normalize_typed_substitution( TextSubstitution(text=create_own_container), bool ) else: self.__create_own_container = normalize_typed_substitution( create_own_container, bool ) if isinstance(use_composition, str): self.__use_composition = normalize_typed_substitution( TextSubstitution(text=use_composition), bool ) else: self.__use_composition = normalize_typed_substitution(use_composition, bool) if isinstance(initial_sim_time, str): self.__initial_sim_time = normalize_typed_substitution( TextSubstitution(text=initial_sim_time), float ) else: self.__initial_sim_time = normalize_typed_substitution(initial_sim_time, float) if isinstance(verbosity_level, str): self.__verbosity_level = normalize_typed_substitution( TextSubstitution(text=verbosity_level), int ) else: self.__verbosity_level = normalize_typed_substitution(verbosity_level, int)
[docs] @classmethod def parse(cls, entity: Entity, parser: Parser): """Parse gz_server.""" kwargs: Dict = super().parse(entity, parser)[1] world_sdf_file = entity.get_attr( 'world_sdf_file', data_type=str, optional=True) world_sdf_string = entity.get_attr( 'world_sdf_string', data_type=str, optional=True) container_name = entity.get_attr( 'container_name', data_type=str, optional=True) create_own_container = entity.get_attr( 'create_own_container', data_type=str, optional=True) use_composition = entity.get_attr( 'use_composition', data_type=str, optional=True) initial_sim_time = entity.get_attr( 'initial_sim_time', data_type=str, optional=True) verbosity_level = entity.get_attr( 'verbosity_level', data_type=str, optional=True) if isinstance(world_sdf_file, str): world_sdf_file = parser.parse_substitution(world_sdf_file) kwargs['world_sdf_file'] = world_sdf_file if isinstance(world_sdf_string, str): world_sdf_string = parser.parse_substitution(world_sdf_string) kwargs['world_sdf_string'] = world_sdf_string if isinstance(container_name, str): container_name = parser.parse_substitution(container_name) kwargs['container_name'] = container_name if isinstance(create_own_container, str): create_own_container = \ parser.parse_substitution(create_own_container) kwargs['create_own_container'] = create_own_container if isinstance(use_composition, str): use_composition = parser.parse_substitution(use_composition) kwargs['use_composition'] = use_composition if isinstance(initial_sim_time, str): initial_sim_time = parser.parse_substitution(initial_sim_time) kwargs['initial_sim_time'] = initial_sim_time if isinstance(verbosity_level, str): verbosity_level = parser.parse_substitution(verbosity_level) kwargs['verbosity_level'] = verbosity_level return cls, kwargs
[docs] def execute(self, context: LaunchContext) -> Optional[List[Action]]: """Execute the action.""" launch_descriptions: List[Action] = [] model_paths, plugin_paths = GazeboRosPaths.get_paths() launch_descriptions.append(SetEnvironmentVariable( 'GZ_SIM_SYSTEM_PLUGIN_PATH', os.pathsep.join([ os.environ.get('GZ_SIM_SYSTEM_PLUGIN_PATH', default=''), os.environ.get('LD_LIBRARY_PATH', default=''), plugin_paths, ]))) launch_descriptions.append(SetEnvironmentVariable( 'GZ_SIM_RESOURCE_PATH', os.pathsep.join([ os.environ.get('GZ_SIM_RESOURCE_PATH', default=''), model_paths, ]))) use_composition_eval = perform_typed_substitution( context, self.__use_composition, bool ) create_own_container_eval = perform_typed_substitution( context, self.__create_own_container, bool ) if not use_composition_eval: # Standard node configuration launch_descriptions.append(Node( package='ros_gz_sim', executable='gzserver', output='screen', parameters=[{'world_sdf_file': self.__world_sdf_file, 'world_sdf_string': self.__world_sdf_string, 'initial_sim_time': self.__initial_sim_time, 'verbosity_level': self.__verbosity_level}], )) # Composable node with container configuration if use_composition_eval and create_own_container_eval: launch_descriptions.append(ComposableNodeContainer( name=self.__container_name, namespace='', package='rclcpp_components', executable='component_container', composable_node_descriptions=[ ComposableNode( package='ros_gz_sim', plugin='ros_gz_sim::GzServer', name='gz_server', parameters=[{'world_sdf_file': self.__world_sdf_file, 'world_sdf_string': self.__world_sdf_string, 'initial_sim_time': self.__initial_sim_time, 'verbosity_level': self.__verbosity_level}], extra_arguments=[{'use_intra_process_comms': True}], ), ], output='screen', )) # Composable node without container configuration if use_composition_eval and not create_own_container_eval: launch_descriptions.append(LoadComposableNodes( target_container=self.__container_name, composable_node_descriptions=[ ComposableNode( package='ros_gz_sim', plugin='ros_gz_sim::GzServer', name='gz_server', parameters=[{'world_sdf_file': self.__world_sdf_file, 'world_sdf_string': self.__world_sdf_string, 'initial_sim_time': self.__initial_sim_time, 'verbosity_level': self.__verbosity_level}], extra_arguments=[{'use_intra_process_comms': True}], ), ], )) return launch_descriptions