better_launch.elements packageο
Submodulesο
- better_launch.elements.abstract_node module
AbstractNode
AbstractNode.executable
AbstractNode.fullname
AbstractNode.get_info_sheet()
AbstractNode.get_published_services()
AbstractNode.get_published_topics()
AbstractNode.get_subscribed_topics()
AbstractNode.is_lifecycle_node()
AbstractNode.is_ros2_connected()
AbstractNode.is_running
AbstractNode.join()
AbstractNode.lifecycle
AbstractNode.name
AbstractNode.namespace
AbstractNode.node_id
AbstractNode.package
AbstractNode.params
AbstractNode.remaps
AbstractNode.shutdown()
AbstractNode.start()
- better_launch.elements.composer module
- better_launch.elements.foreign_node module
- better_launch.elements.group module
- better_launch.elements.lifecycle_manager module
- better_launch.elements.live_params_mixin module
- better_launch.elements.node module
- better_launch.elements.ros2_launch_wrapper module
Ros2LaunchWrapper
Ros2LaunchWrapper.describe_launch_actions()
Ros2LaunchWrapper.is_lifecycle_node()
Ros2LaunchWrapper.is_ros2_connected()
Ros2LaunchWrapper.is_running
Ros2LaunchWrapper.join()
Ros2LaunchWrapper.launchservice_args
Ros2LaunchWrapper.pid
Ros2LaunchWrapper.queue_ros2_actions()
Ros2LaunchWrapper.send_signal()
Ros2LaunchWrapper.shutdown()
Ros2LaunchWrapper.start()
Module contentsο
- class better_launch.elements.Component(composer: Composer, package: str, plugin: str, name: str, namespace: str, *, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen', remaps: dict[str, str] | None = None, params: str | dict[str, Any] | None = None)ο
Bases:
AbstractNode
,LiveParamsMixin
- property component_id: intο
The ID this component got assigned when it was loaded into the composer. Will be None if the component is not loaded.
- property is_loaded: boolο
True if the component is loaded, False otherwise.
- property is_running: boolο
True if the node is currently running.
- join(timeout: float | None = None) None ο
Join this node and return once it is shut down. Return immediately if it is not running.
Parametersο
- timeoutfloat, optional
How long to wait in seconds. Wait forever if None.
Raisesο
- TimeoutError
If a timeout was set and the node is still running by the time it expires.
- property plugin: strο
The special string that is used for loading the component.
executable()
will return the same.
- shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = 0.0) None ο
Unload this component if it was loaded.
Parametersο
- reasonstr
A human-readable string describing why the component is being unloaded.
- signumint, optional
Ignored for components.
- start(use_intra_process_comms: bool = True, **composer_extra_params) None ο
Load this component into its composer.
Additional keyword arguments will be passed as ROS parameters to the component.
Parametersο
- use_intra_process_commsbool, optional
If True, the component will use intra process communication for exchanging messages with other components within the same composer.
- class better_launch.elements.Composer(wrapped_node: AbstractNode, *, component_remaps: dict[str, str] | None = None, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen')ο
Bases:
AbstractNode
- get_live_components() dict[int, str] ο
Use a service call to retrieve the components currently loaded into this composer and their IDs.
Returnsο
- dict[int, str]
A dict mapping component IDs to full node names.
- classmethod is_composer(node: AbstractNode, timeout: float = 0.0) bool ο
Checks whether a node provides services for loading components.
For a node to be a composer, it must be running, be registered with ROS and offer the ROS composition services. This method only checks whether one of the key services is present.
If a timeout is specified, the check will be repeated until it succeeds or the specified amount of time has passed. This is to ensure that a freshly started node had enough time to create its topics, especially on slower devices.
Parametersο
- nodeAbstractNode
The node object to check.
- timeoutfloat
How long to wait at most for the composition services to appear. Wait forever if None.
Returnsο
- bool
True if the node supports loading components, False otherwise.
- property is_lifecycle: boolο
Composers are not lifecycle nodes.
- property is_running: boolο
True if the node is currently running.
- join(timeout: float | None = None) None ο
Join this node and return once it is shut down. Return immediately if it is not running.
Parametersο
- timeoutfloat, optional
How long to wait in seconds. Wait forever if None.
Raisesο
- TimeoutError
If a timeout was set and the node is still running by the time it expires.
- property language: strο
The implementation language of this composer, usually cpp or py. Corresponds to this composerβs package (e.g. rclcpp_composition), but will be None if itβs a custom implementation.
- load_component(component: Component, use_intra_process_comms: bool = True, **composer_extra_params: dict) int ο
Load this component into its composer.
Additional keyword arguments will be passed as ROS parameters to the component. If the component is not associated with this composer yet, a warning will be logged and its association will be updated.
Note that since components are loaded via a service call that there are some additional restrictions on the types of
Component.params()
and composer_extra_params. In particular, they must be compatible with the ROS2 Parameter message type.Also note that new nodes created by a component are using the composerβs remaps. See the related issue in rclcpp.
Parametersο
- component: Component
The component to load.
- use_intra_process_commsbool, optional
If True, the component will use intra process communication for exchanging messages with other components within the same composer.
Returnsο
- int
The ID the component got assigned by ROS.
Raisesο
- ValueError
If this composer is not running, if the component is already loaded, or if the parameters could not be serialized.
- RuntimeError
If loading the component failed.
- property managed_components: list[Component]ο
The components that were explicitly loaded through this composer instance. This will not contain components that have been loaded via external service calls.
- shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float | None = None) None ο
This will shutdown the composer node. Unloading any loaded components is left to the actual composer implementation.
Parametersο
- reasonstr
A human-readable string describing why the composer and its components are being shutdown.
- signumint, optional
The signal that should be send to the composer.
- timeoutfloat, optional
How long to wait for each component and the composer to shutdown before returning. Donβt wait if timeout is 0.0. Wait forever if timeout is None.
Raisesο
- TimeoutError
If a timeout > 0 was set and any of the components or the composer did not shutdown before then.
- start(service_timeout: float = 5.0) None ο
Start this node. Once this succeeds,
is_running()
will return True.Parametersο
- service_timeoutfloat, optional
How long to wait for each composition service to appear (3 total). Wait forever if set negative. Donβt check at all if None or 0.
- unload_component(component: Component | int, timeout: float | None = None) None ο
Unload the specified component, essentially stopping its node.
Note that an unload request will be issued even if the component reports it is not loaded.
Parametersο
- componentComponent | int
The component or a componentβs ID to stop.
- timeoutfloat, optional
How long to wait for the component to be unloaded before returning. Donβt wait if timeout is 0.0. Wait forever if timeout is None.
Returnsο
- bool
True if unloading the component succeeded, False otherwise.
Raisesο
- ValueError
If this composer is not running.
- KeyError
If the provided component has not been loaded into this node
- class better_launch.elements.ForeignNode(process: psutil.Process, package: str, name: str, namespace: str, *, remaps: dict[str, str] | None = None, params: str | dict[str, Any] | None = None, cmd_args: list[str] | None = None, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen')ο
Bases:
AbstractNode
,LiveParamsMixin
- property cmd_args: list[str]ο
Additional arguments passed to the node process.
- property is_running: boolο
True if the node is currently running.
- join(timeout: float | None = None) int ο
Wait for the underlying process to terminate and return its exit code. Returns immediately if the process is not running.
Parametersο
- timeoutfloat, optional
How long to wait for the process to finish. Wait forever if None.
Returnsο
- int
The exit code of the process, or None if it is already terminated.
Raisesο
- TimeoutError
If a timeout was specified and the process is still running by the time the timeout expires.
- property pid: intο
The process ID of the node process. Will be -1 if the process is not running.
- shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = 0.0) None ο
Shutdown this node. Once this succeeds,
is_running()
will return False.Parametersο
- reasonstr
A human-readable string describing why this node is being shutdown.
- signumint, optional
The signal that should be send to the node (if supported).
- timeoutfloat, optional
How long to wait for the node to shutdown before returning. Donβt wait if timeout is 0.0. Wait forever if timeout is None.
Raisesο
- TimeoutError
If a timeout > 0 was set and the node did not shutdown before then.
- start() None ο
Start this node. Once this succeeds,
is_running()
will return True.
- takeover(kill_after: float = 0, **node_args) Node ο
Replaces a foreign node with a node belonging to this better_launch process. This allows to e.g. capture the nodeβs output and control a few additional runtime parameters. Any interactions with this foreign node instance after this function returns are undefined behavior.
NOTE: Currently the only way to takeover a node is to stop the original process, then recreate and restart the node with the same arguments as the original node.
Parametersο
- kill_after: float, optional
Kill the node process if it takes longer than this many seconds to shutdown. Disabled when <= 0.
- node_argsdict[str, Any], optional
Additional arguments to pass to the node. See
Node
for additional details.
Returnsο
- Node
The new node instance that should replace this foreign node.
- classmethod wrap_process(process: psutil.Process) ForeignNode ο
Collect information like namespace, package, executable and so on from a process and wrap it in a ForeignNode instance.
Note that there is no way to verify whether the process actually belongs to a ROS node. Consider using e.g.
find_ros2_node_processes()
to that effect.Parametersο
- processpsutil.Process
The process to wrap.
Returnsο
- ForeignNode
A node object carrying the extracted node information.
- class better_launch.elements.Group(parent: Group, namespace: str)ο
Bases:
object
- add_child(child: Group) None ο
Add a child group to this group.
Parametersο
- childGroup
The group to add.
- class better_launch.elements.LifecycleManager(node: AbstractNode)ο
Bases:
object
- property current_stage: LifecycleStageο
The nodeβs current (that is, last known) lifecycle stage.
- classmethod find_transition_path(start_ros_state: int, goal_ros_state: int) list[int] ο
Finds a sequence of transitions that will bring a node from an initial lifecycle state (the ROS state, not our LifecycleStage enum) to a target lifecycle state.
This is fairly low level and probably never needed.
Parametersο
- start_ros_stateint
The initial ROS lifecycle state.
- goal_ros_stateint
The final ROS lifecycle state.
Returnsο
- list[int]
A sequence of ROS lifecycle transitions that form a path from the start state to the goal state.
- classmethod is_lifecycle(node: AbstractNode, timeout: float | None = None) bool ο
Checks whether a node supports lifecycle management.
For a node to support lifecycle management, it must be running, be registered with ROS and offer the ROS lifecycle management services. This method only checks whether one of the key services is present.
If a timeout is specified, the check will be repeated until it succeeds or the specified amount of time has passed. This is to ensure that a freshly started node had enough time to create its topics, especially on slower devices. See
AbstractNode.is_lifecycle_node()
for additional information.Parametersο
- nodeAbstractNode
The node object to check for lifecycle support.
- timeoutfloat
How long to wait at most for the lifecycle services to appear. Wait forever if None.
Returnsο
- bool
True if the node supports lifecycle management, False otherwise.
- property ros_state: intο
The nodeβs current (that is, last known) ROS lifecycle state ID.
- transition(target_stage: LifecycleStage) bool ο
Transition the managed node into the target lifecycle stage. Does nothing if the node is already in the desired stage.
Note that you donβt have to do step-by-step transitions - simply specify the stage you want the node to end up in and it will go through all the intermediate steps (assuming a path exists).
Parametersο
- target_stageLifecycleStage
The lifecycle stage you want the node to end up in.
Returnsο
- bool
True if the transition sequence succeeded, False if one of the steps failed.
Raisesο
- ValueError
If no path to the target stage could be found.
- class better_launch.elements.LifecycleStage(value)ο
Bases:
IntEnum
Represents the main stages of nodes that support lifecycle management.
- ACTIVE = 2ο
- CONFIGURED = 1ο
- FINALIZED = 3ο
- PRISTINE = 0ο
- class better_launch.elements.Node(package: str, executable: str, name: str, namespace: str, *, remaps: dict[str, str] | None = None, params: str | dict[str, Any] | None = None, cmd_args: list[str] | None = None, env: dict[str, str] | None = None, isolate_env: bool = False, log_level: int = 20, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen', on_exit: Callable | None = None, max_respawns: int = 0, respawn_delay: float = 0.0, use_shell: bool = False, raw: bool = False)ο
Bases:
AbstractNode
,LiveParamsMixin
- property is_running: boolο
True if the node is currently running.
- join(timeout: float | None = None) int ο
Wait for the underlying process to terminate and return its exit code. Returns immediately if the process is not running.
Parametersο
- timeoutfloat, optional
How long to wait for the process to finish. Wait forever if None.
Returnsο
- int
The exit code of the process, or None if it is already terminated.
Raisesο
- TimeoutError
If a timeout was specified and the process is still running by the time the timeout expires.
- property pid: intο
The process ID of the node process. Will be -1 if the process is not running.
- shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = 0.0) None ο
Shutdown this node. Once this succeeds,
is_running()
will return False.Parametersο
- reasonstr
A human-readable string describing why this node is being shutdown.
- signumint, optional
The signal that should be send to the node (if supported).
- timeoutfloat, optional
How long to wait for the node to shutdown before returning. Donβt wait if timeout is 0.0. Wait forever if timeout is None.
Raisesο
- TimeoutError
If a timeout > 0 was set and the node did not shutdown before then.
- start() None ο
Start this node. Once this succeeds,
is_running()
will return True.