better_launch.elements packageο
Submodulesο
- better_launch.elements.abstract_node module- AbstractNode- AbstractNode.executable
- AbstractNode.fullname
- AbstractNode.get_info_sheet()
- AbstractNode.get_published_services()
- AbstractNode.get_published_topics()
- AbstractNode.get_subscribed_topics()
- AbstractNode.is_lifecycle_node()
- AbstractNode.is_ros2_connected()
- AbstractNode.is_running
- AbstractNode.join()
- AbstractNode.lifecycle
- AbstractNode.name
- AbstractNode.namespace
- AbstractNode.node_id
- AbstractNode.package
- AbstractNode.params
- AbstractNode.remaps
- AbstractNode.shutdown()
- AbstractNode.start()
 
 
- better_launch.elements.composer module
- better_launch.elements.foreign_node module
- better_launch.elements.group module
- better_launch.elements.lifecycle_manager module
- better_launch.elements.live_params_mixin module
- better_launch.elements.node module
- better_launch.elements.ros2_launch_wrapper module- Ros2LaunchWrapper- Ros2LaunchWrapper.describe_launch_actions()
- Ros2LaunchWrapper.is_lifecycle_node()
- Ros2LaunchWrapper.is_ros2_connected()
- Ros2LaunchWrapper.is_running
- Ros2LaunchWrapper.join()
- Ros2LaunchWrapper.launchservice_args
- Ros2LaunchWrapper.pid
- Ros2LaunchWrapper.queue_ros2_actions()
- Ros2LaunchWrapper.send_signal()
- Ros2LaunchWrapper.shutdown()
- Ros2LaunchWrapper.start()
 
 
Module contentsο
- class better_launch.elements.Component(composer: Composer, package: str, plugin: str, name: str, namespace: str, *, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen', remaps: dict[str, str] = None, params: str | dict[str, Any] = None)ο
- Bases: - AbstractNode,- LiveParamsMixin- property component_id: intο
- The ID this component got assigned when it was loaded into the composer. Will be None if the component is not loaded. 
 - property is_loaded: boolο
- True if the component is loaded, False otherwise. 
 - property is_running: boolο
- True if the node is currently running. 
 - join(timeout: float = None) Noneο
- Join this node and return once it is shut down. Return immediately if it is not running. - Parametersο- timeoutfloat, optional
- How long to wait in seconds. Wait forever if None. 
 - Raisesο- TimeoutError
- If a timeout was set and the node is still running by the time it expires. 
 
 - property plugin: strο
- The special string that is used for loading the component. - executable()will return the same.
 - shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = 0.0) Noneο
- Unload this component if it was loaded. - Parametersο- reasonstr
- A human-readable string describing why the component is being unloaded. 
- signumint, optional
- Ignored for components. 
 
 - start(use_intra_process_comms: bool = True, **composer_extra_params) Noneο
- Load this component into its composer. - Additional keyword arguments will be passed as ROS parameters to the component. - Parametersο- use_intra_process_commsbool, optional
- If True, the component will use intra process communication for exchanging messages with other components within the same composer. 
 
 
- class better_launch.elements.Composer(wrapped_node: AbstractNode, *, component_remaps: dict[str, str] = None, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen')ο
- Bases: - AbstractNode- get_live_components() dict[int, str]ο
- Use a service call to retrieve the components currently loaded into this composer and their IDs. - Returnsο- dict[int, str]
- A dict mapping component IDs to full node names. 
 
 - classmethod is_composer(node: AbstractNode, timeout: float = 0.0) boolο
- Checks whether a node provides services for loading components. - For a node to be a composer, it must be running, be registered with ROS and offer the ROS composition services. This method only checks whether one of the key services is present. - If a timeout is specified, the check will be repeated until it succeeds or the specified amount of time has passed. This is to ensure that a freshly started node had enough time to create its topics, especially on slower devices. - Parametersο- nodeAbstractNode
- The node object to check. 
- timeoutfloat
- How long to wait at most for the composition services to appear. Wait forever if None. 
 - Returnsο- bool
- True if the node supports loading components, False otherwise. 
 
 - property is_lifecycle: boolο
- Composers are not lifecycle nodes. 
 - property is_running: boolο
- True if the node is currently running. 
 - join(timeout: float = None) Noneο
- Join this node and return once it is shut down. Return immediately if it is not running. - Parametersο- timeoutfloat, optional
- How long to wait in seconds. Wait forever if None. 
 - Raisesο- TimeoutError
- If a timeout was set and the node is still running by the time it expires. 
 
 - property language: strο
- The implementation language of this composer, usually cpp or py. Corresponds to this composerβs package (e.g. rclcpp_composition), but will be None if itβs a custom implementation. 
 - load_component(component: Component, use_intra_process_comms: bool = True, **composer_extra_params: dict) intο
- Load this component into its composer. - Additional keyword arguments will be passed as ROS parameters to the component. If the component is not associated with this composer yet, a warning will be logged and its association will be updated. - Note that since components are loaded via a service call that there are some additional restrictions on the types of - Component.params()and composer_extra_params. In particular, they must be compatible with the ROS2 Parameter message type.- Also note that new nodes created by a component are using the composerβs remaps. See the related issue in rclcpp. - Parametersο- component: Component
- The component to load. 
- use_intra_process_commsbool, optional
- If True, the component will use intra process communication for exchanging messages with other components within the same composer. 
 - Returnsο- int
- The ID the component got assigned by ROS. 
 - Raisesο- ValueError
- If this composer is not running, if the component is already loaded, or if the parameters could not be serialized. 
- RuntimeError
- If loading the component failed. 
 
 - property managed_components: list[Component]ο
- The components that were explicitly loaded through this composer instance. This will not contain components that have been loaded via external service calls. 
 - shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = None) Noneο
- This will shutdown the composer node. Unloading any loaded components is left to the actual composer implementation. - Parametersο- reasonstr
- A human-readable string describing why the composer and its components are being shutdown. 
- signumint, optional
- The signal that should be send to the composer. 
- timeoutfloat, optional
- How long to wait for each component and the composer to shutdown before returning. Donβt wait if timeout is 0.0. Wait forever if timeout is None. 
 - Raisesο- TimeoutError
- If a timeout > 0 was set and any of the components or the composer did not shutdown before then. 
 
 - start(service_timeout: float = 5.0) Noneο
- Start this node. Once this succeeds, - is_running()will return True.- Parametersο- service_timeoutfloat, optional
- How long to wait for each composition service to appear (3 total). Wait forever if set negative. Donβt check at all if None or 0. 
 
 - unload_component(component: Component | int, timeout: float = None) Noneο
- Unload the specified component, essentially stopping its node. - Note that an unload request will be issued even if the component reports it is not loaded. - Parametersο- componentComponent | int
- The component or a componentβs ID to stop. 
- timeoutfloat, optional
- How long to wait for the component to be unloaded before returning. Donβt wait if timeout is 0.0. Wait forever if timeout is None. 
 - Returnsο- bool
- True if unloading the component succeeded, False otherwise. 
 - Raisesο- ValueError
- If this composer is not running. 
- KeyError
- If the provided component has not been loaded into this node 
 
 
- class better_launch.elements.ForeignNode(process: psutil.Process, package: str, name: str, namespace: str, *, remaps: dict[str, str] = None, params: str | dict[str, Any] = None, cmd_args: list[str] = None, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen')ο
- Bases: - AbstractNode,- LiveParamsMixin- property cmd_args: list[str]ο
- Additional arguments passed to the node process. 
 - property is_running: boolο
- True if the node is currently running. 
 - join(timeout: float = None) intο
- Wait for the underlying process to terminate and return its exit code. Returns immediately if the process is not running. - Parametersο- timeoutfloat, optional
- How long to wait for the process to finish. Wait forever if None. 
 - Returnsο- int
- The exit code of the process, or None if it is already terminated. 
 - Raisesο- TimeoutError
- If a timeout was specified and the process is still running by the time the timeout expires. 
 
 - property pid: intο
- The process ID of the node process. Will be -1 if the process is not running. 
 - shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = 0.0) Noneο
- Shutdown this node. Once this succeeds, - is_running()will return False.- Parametersο- reasonstr
- A human-readable string describing why this node is being shutdown. 
- signumint, optional
- The signal that should be send to the node (if supported). 
- timeoutfloat, optional
- How long to wait for the node to shutdown before returning. Donβt wait if timeout is 0.0. Wait forever if timeout is None. 
 - Raisesο- TimeoutError
- If a timeout > 0 was set and the node did not shutdown before then. 
 
 - start() Noneο
- Start this node. Once this succeeds, - is_running()will return True.
 - takeover(kill_after: float = 0, **node_args) Nodeο
- Replaces a foreign node with a node belonging to this better_launch process. This allows to e.g. capture the nodeβs output and control a few additional runtime parameters. Any interactions with this foreign node instance after this function returns are undefined behavior. - NOTE: Currently the only way to takeover a node is to stop the original process, then recreate and restart the node with the same arguments as the original node. - Parametersο- kill_after: float, optional
- Kill the node process if it takes longer than this many seconds to shutdown. Disabled when <= 0. 
- node_argsdict[str, Any], optional
- Additional arguments to pass to the node. See - Nodefor additional details.
 - Returnsο- Node
- The new node instance that should replace this foreign node. 
 
 - classmethod wrap_process(process: psutil.Process) ForeignNodeο
- Collect information like namespace, package, executable and so on from a process and wrap it in a ForeignNode instance. - Note that there is no way to verify whether the process actually belongs to a ROS node. Consider using e.g. - find_ros2_node_processes()to that effect.- Parametersο- processpsutil.Process
- The process to wrap. 
 - Returnsο- ForeignNode
- A node object carrying the extracted node information. 
 
 
- class better_launch.elements.Group(parent: Group, namespace: str)ο
- Bases: - object- add_child(child: Group) Noneο
- Add a child group to this group. - Parametersο- childGroup
- The group to add. 
 
 
- class better_launch.elements.LifecycleManager(node: AbstractNode)ο
- Bases: - object- property current_stage: LifecycleStageο
- The nodeβs current (that is, last known) lifecycle stage. 
 - classmethod find_transition_path(start_ros_state: int, goal_ros_state: int) list[int]ο
- Finds a sequence of transitions that will bring a node from an initial lifecycle state (the ROS state, not our LifecycleStage enum) to a target lifecycle state. - This is fairly low level and probably never needed. - Parametersο- start_ros_stateint
- The initial ROS lifecycle state. 
- goal_ros_stateint
- The final ROS lifecycle state. 
 - Returnsο- list[int]
- A sequence of ROS lifecycle transitions that form a path from the start state to the goal state. 
 
 - classmethod is_lifecycle(node: AbstractNode, timeout: float = None) boolο
- Checks whether a node supports lifecycle management. - For a node to support lifecycle management, it must be running, be registered with ROS and offer the ROS lifecycle management services. This method only checks whether one of the key services is present. - If a timeout is specified, the check will be repeated until it succeeds or the specified amount of time has passed. This is to ensure that a freshly started node had enough time to create its topics, especially on slower devices. See - AbstractNode.is_lifecycle_node()for additional information.- Parametersο- nodeAbstractNode
- The node object to check for lifecycle support. 
- timeoutfloat
- How long to wait at most for the lifecycle services to appear. Wait forever if None. 
 - Returnsο- bool
- True if the node supports lifecycle management, False otherwise. 
 
 - property ros_state: intο
- The nodeβs current (that is, last known) ROS lifecycle state ID. 
 - transition(target_stage: LifecycleStage) boolο
- Transition the managed node into the target lifecycle stage. Does nothing if the node is already in the desired stage. - Note that you donβt have to do step-by-step transitions - simply specify the stage you want the node to end up in and it will go through all the intermediate steps (assuming a path exists). - Parametersο- target_stageLifecycleStage
- The lifecycle stage you want the node to end up in. 
 - Returnsο- bool
- True if the transition sequence succeeded, False if one of the steps failed. 
 - Raisesο- ValueError
- If no path to the target stage could be found. 
 
 
- class better_launch.elements.LifecycleStage(*values)ο
- Bases: - IntEnum- Represents the main stages of nodes that support lifecycle management. - ACTIVE = 2ο
 - CONFIGURED = 1ο
 - FINALIZED = 3ο
 - PRISTINE = 0ο
 
- class better_launch.elements.Node(package: str, executable: str, name: str, namespace: str, *, remaps: dict[str, str] = None, params: str | dict[str, Any] = None, cmd_args: list[str] = None, env: dict[str, str] = None, isolate_env: bool = False, log_level: int = 20, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen', on_exit: Callable = None, max_respawns: int = 0, respawn_delay: float = 0.0, use_shell: bool = False, raw: bool = False)ο
- Bases: - AbstractNode,- LiveParamsMixin- property is_running: boolο
- True if the node is currently running. 
 - join(timeout: float = None) intο
- Wait for the underlying process to terminate and return its exit code. Returns immediately if the process is not running. - Parametersο- timeoutfloat, optional
- How long to wait for the process to finish. Wait forever if None. 
 - Returnsο- int
- The exit code of the process, or None if it is already terminated. 
 - Raisesο- TimeoutError
- If a timeout was specified and the process is still running by the time the timeout expires. 
 
 - property pid: intο
- The process ID of the node process. Will be -1 if the process is not running. 
 - shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = 0.0) Noneο
- Shutdown this node. Once this succeeds, - is_running()will return False.- Parametersο- reasonstr
- A human-readable string describing why this node is being shutdown. 
- signumint, optional
- The signal that should be send to the node (if supported). 
- timeoutfloat, optional
- How long to wait for the node to shutdown before returning. Donβt wait if timeout is 0.0. Wait forever if timeout is None. 
 - Raisesο- TimeoutError
- If a timeout > 0 was set and the node did not shutdown before then. 
 
 - start() Noneο
- Start this node. Once this succeeds, - is_running()will return True.