better_launch.elements package

Submodules

Module contents

class better_launch.elements.Component(composer: Composer, package: str, plugin: str, name: str, namespace: str, *, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen', remaps: dict[str, str] | None = None, params: str | dict[str, Any] | None = None)

Bases: AbstractNode, LiveParamsMixin

property component_id: int

The ID this component got assigned when it was loaded into the composer. Will be None if the component is not loaded.

property composer: Composer

The composer this component is associated with.

property is_loaded: bool

True if the component is loaded, False otherwise.

property is_running: bool

True if the node is currently running.

join(timeout: float | None = None) None

Join this node and return once it is shut down. Return immediately if it is not running.

Parameters

timeoutfloat, optional

How long to wait in seconds. Wait forever if None.

Raises

TimeoutError

If a timeout was set and the node is still running by the time it expires.

property plugin: str

The special string that is used for loading the component. executable() will return the same.

shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = 0.0) None

Unload this component if it was loaded.

Parameters

reasonstr

A human-readable string describing why the component is being unloaded.

signumint, optional

Ignored for components.

start(use_intra_process_comms: bool = True, **composer_extra_params) None

Load this component into its composer.

Additional keyword arguments will be passed as ROS parameters to the component.

Parameters

use_intra_process_commsbool, optional

If True, the component will use intra process communication for exchanging messages with other components within the same composer.

class better_launch.elements.Composer(wrapped_node: AbstractNode, *, component_remaps: dict[str, str] | None = None, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen')

Bases: AbstractNode

get_live_components() dict[int, str]

Use a service call to retrieve the components currently loaded into this composer and their IDs.

Returns

dict[int, str]

A dict mapping component IDs to full node names.

classmethod is_composer(node: AbstractNode, timeout: float = 0.0) bool

Checks whether a node provides services for loading components.

For a node to be a composer, it must be running, be registered with ROS and offer the ROS composition services. This method only checks whether one of the key services is present.

If a timeout is specified, the check will be repeated until it succeeds or the specified amount of time has passed. This is to ensure that a freshly started node had enough time to create its topics, especially on slower devices.

Parameters

nodeAbstractNode

The node object to check.

timeoutfloat

How long to wait at most for the composition services to appear. Wait forever if None.

Returns

bool

True if the node supports loading components, False otherwise.

property is_lifecycle: bool

Composers are not lifecycle nodes.

property is_running: bool

True if the node is currently running.

join(timeout: float | None = None) None

Join this node and return once it is shut down. Return immediately if it is not running.

Parameters

timeoutfloat, optional

How long to wait in seconds. Wait forever if None.

Raises

TimeoutError

If a timeout was set and the node is still running by the time it expires.

property language: str

The implementation language of this composer, usually cpp or py. Corresponds to this composer’s package (e.g. rclcpp_composition), but will be None if it’s a custom implementation.

load_component(component: Component, use_intra_process_comms: bool = True, **composer_extra_params: dict) int

Load this component into its composer.

Additional keyword arguments will be passed as ROS parameters to the component. If the component is not associated with this composer yet, a warning will be logged and its association will be updated.

Note that since components are loaded via a service call that there are some additional restrictions on the types of Component.params() and composer_extra_params. In particular, they must be compatible with the ROS2 Parameter message type.

Also note that new nodes created by a component are using the composer’s remaps. See the related issue in rclcpp.

Parameters

component: Component

The component to load.

use_intra_process_commsbool, optional

If True, the component will use intra process communication for exchanging messages with other components within the same composer.

Returns

int

The ID the component got assigned by ROS.

Raises

ValueError

If this composer is not running, if the component is already loaded, or if the parameters could not be serialized.

RuntimeError

If loading the component failed.

property managed_components: list[Component]

The components that were explicitly loaded through this composer instance. This will not contain components that have been loaded via external service calls.

shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float | None = None) None

This will shutdown the composer node. Unloading any loaded components is left to the actual composer implementation.

Parameters

reasonstr

A human-readable string describing why the composer and its components are being shutdown.

signumint, optional

The signal that should be send to the composer.

timeoutfloat, optional

How long to wait for each component and the composer to shutdown before returning. Don’t wait if timeout is 0.0. Wait forever if timeout is None.

Raises

TimeoutError

If a timeout > 0 was set and any of the components or the composer did not shutdown before then.

start(service_timeout: float = 5.0) None

Start this node. Once this succeeds, is_running() will return True.

Parameters

service_timeoutfloat, optional

How long to wait for each composition service to appear (3 total). Wait forever if set negative. Don’t check at all if None or 0.

unload_component(component: Component | int, timeout: float | None = None) None

Unload the specified component, essentially stopping its node.

Note that an unload request will be issued even if the component reports it is not loaded.

Parameters

componentComponent | int

The component or a component’s ID to stop.

timeoutfloat, optional

How long to wait for the component to be unloaded before returning. Don’t wait if timeout is 0.0. Wait forever if timeout is None.

Returns

bool

True if unloading the component succeeded, False otherwise.

Raises

ValueError

If this composer is not running.

KeyError

If the provided component has not been loaded into this node

class better_launch.elements.ForeignNode(process: psutil.Process, package: str, name: str, namespace: str, *, remaps: dict[str, str] | None = None, params: str | dict[str, Any] | None = None, cmd_args: list[str] | None = None, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen')

Bases: AbstractNode, LiveParamsMixin

property cmd_args: list[str]

Additional arguments passed to the node process.

property is_running: bool

True if the node is currently running.

join(timeout: float | None = None) int

Wait for the underlying process to terminate and return its exit code. Returns immediately if the process is not running.

Parameters

timeoutfloat, optional

How long to wait for the process to finish. Wait forever if None.

Returns

int

The exit code of the process, or None if it is already terminated.

Raises

TimeoutError

If a timeout was specified and the process is still running by the time the timeout expires.

property pid: int

The process ID of the node process. Will be -1 if the process is not running.

shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = 0.0) None

Shutdown this node. Once this succeeds, is_running() will return False.

Parameters

reasonstr

A human-readable string describing why this node is being shutdown.

signumint, optional

The signal that should be send to the node (if supported).

timeoutfloat, optional

How long to wait for the node to shutdown before returning. Don’t wait if timeout is 0.0. Wait forever if timeout is None.

Raises

TimeoutError

If a timeout > 0 was set and the node did not shutdown before then.

start() None

Start this node. Once this succeeds, is_running() will return True.

takeover(kill_after: float = 0, **node_args) Node

Replaces a foreign node with a node belonging to this better_launch process. This allows to e.g. capture the node’s output and control a few additional runtime parameters. Any interactions with this foreign node instance after this function returns are undefined behavior.

NOTE: Currently the only way to takeover a node is to stop the original process, then recreate and restart the node with the same arguments as the original node.

Parameters

kill_after: float, optional

Kill the node process if it takes longer than this many seconds to shutdown. Disabled when <= 0.

node_argsdict[str, Any], optional

Additional arguments to pass to the node. See Node for additional details.

Returns

Node

The new node instance that should replace this foreign node.

classmethod wrap_process(process: psutil.Process) ForeignNode

Collect information like namespace, package, executable and so on from a process and wrap it in a ForeignNode instance.

Note that there is no way to verify whether the process actually belongs to a ROS node. Consider using e.g. find_ros2_node_processes() to that effect.

Parameters

processpsutil.Process

The process to wrap.

Returns

ForeignNode

A node object carrying the extracted node information.

class better_launch.elements.Group(parent: Group, namespace: str)

Bases: object

add_child(child: Group) None

Add a child group to this group.

Parameters

childGroup

The group to add.

add_node(node: Node) None

Add a node to this group.

This group will not do any magic to enforce its namespace or remaps onto the node. Calling this with a node that has been added before will do nothing.

Parameters

nodeNode

The node to add.

assemble_namespace() str

Return the full namespace string this group represents.

Returns

str

This group’s namespace path from the root group.

class better_launch.elements.LifecycleManager(node: AbstractNode)

Bases: object

property current_stage: LifecycleStage

The node’s current (that is, last known) lifecycle stage.

classmethod find_transition_path(start_ros_state: int, goal_ros_state: int) list[int]

Finds a sequence of transitions that will bring a node from an initial lifecycle state (the ROS state, not our LifecycleStage enum) to a target lifecycle state.

This is fairly low level and probably never needed.

Parameters

start_ros_stateint

The initial ROS lifecycle state.

goal_ros_stateint

The final ROS lifecycle state.

Returns

list[int]

A sequence of ROS lifecycle transitions that form a path from the start state to the goal state.

classmethod is_lifecycle(node: AbstractNode, timeout: float | None = None) bool

Checks whether a node supports lifecycle management.

For a node to support lifecycle management, it must be running, be registered with ROS and offer the ROS lifecycle management services. This method only checks whether one of the key services is present.

If a timeout is specified, the check will be repeated until it succeeds or the specified amount of time has passed. This is to ensure that a freshly started node had enough time to create its topics, especially on slower devices. See AbstractNode.is_lifecycle_node() for additional information.

Parameters

nodeAbstractNode

The node object to check for lifecycle support.

timeoutfloat

How long to wait at most for the lifecycle services to appear. Wait forever if None.

Returns

bool

True if the node supports lifecycle management, False otherwise.

property ros_state: int

The node’s current (that is, last known) ROS lifecycle state ID.

transition(target_stage: LifecycleStage) bool

Transition the managed node into the target lifecycle stage. Does nothing if the node is already in the desired stage.

Note that you don’t have to do step-by-step transitions - simply specify the stage you want the node to end up in and it will go through all the intermediate steps (assuming a path exists).

Parameters

target_stageLifecycleStage

The lifecycle stage you want the node to end up in.

Returns

bool

True if the transition sequence succeeded, False if one of the steps failed.

Raises

ValueError

If no path to the target stage could be found.

class better_launch.elements.LifecycleStage(value)

Bases: IntEnum

Represents the main stages of nodes that support lifecycle management.

ACTIVE = 2
CONFIGURED = 1
FINALIZED = 3
PRISTINE = 0
class better_launch.elements.Node(package: str, executable: str, name: str, namespace: str, *, remaps: dict[str, str] | None = None, params: str | dict[str, Any] | None = None, cmd_args: list[str] | None = None, env: dict[str, str] | None = None, isolate_env: bool = False, log_level: int = 20, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen', on_exit: Callable | None = None, max_respawns: int = 0, respawn_delay: float = 0.0, use_shell: bool = False, raw: bool = False)

Bases: AbstractNode, LiveParamsMixin

property is_running: bool

True if the node is currently running.

join(timeout: float | None = None) int

Wait for the underlying process to terminate and return its exit code. Returns immediately if the process is not running.

Parameters

timeoutfloat, optional

How long to wait for the process to finish. Wait forever if None.

Returns

int

The exit code of the process, or None if it is already terminated.

Raises

TimeoutError

If a timeout was specified and the process is still running by the time the timeout expires.

property pid: int

The process ID of the node process. Will be -1 if the process is not running.

shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = 0.0) None

Shutdown this node. Once this succeeds, is_running() will return False.

Parameters

reasonstr

A human-readable string describing why this node is being shutdown.

signumint, optional

The signal that should be send to the node (if supported).

timeoutfloat, optional

How long to wait for the node to shutdown before returning. Don’t wait if timeout is 0.0. Wait forever if timeout is None.

Raises

TimeoutError

If a timeout > 0 was set and the node did not shutdown before then.

start() None

Start this node. Once this succeeds, is_running() will return True.