better_launch.elements.composer module

class better_launch.elements.composer.Component(composer: Composer, package: str, plugin: str, name: str, namespace: str, *, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen', remaps: dict[str, str] | None = None, params: str | dict[str, Any] | None = None)

Bases: AbstractNode, LiveParamsMixin

property component_id: int

The ID this component got assigned when it was loaded into the composer. Will be None if the component is not loaded.

property composer: Composer

The composer this component is associated with.

property is_loaded: bool

True if the component is loaded, False otherwise.

property is_running: bool

True if the node is currently running.

join(timeout: float | None = None) None

Join this node and return once it is shut down. Return immediately if it is not running.

Parameters

timeoutfloat, optional

How long to wait in seconds. Wait forever if None.

Raises

TimeoutError

If a timeout was set and the node is still running by the time it expires.

property plugin: str

The special string that is used for loading the component. executable() will return the same.

shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = 0.0) None

Unload this component if it was loaded.

Parameters

reasonstr

A human-readable string describing why the component is being unloaded.

signumint, optional

Ignored for components.

start(use_intra_process_comms: bool = True, **composer_extra_params) None

Load this component into its composer.

Additional keyword arguments will be passed as ROS parameters to the component.

Parameters

use_intra_process_commsbool, optional

If True, the component will use intra process communication for exchanging messages with other components within the same composer.

class better_launch.elements.composer.Composer(wrapped_node: AbstractNode, *, component_remaps: dict[str, str] | None = None, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen')

Bases: AbstractNode

get_live_components() dict[int, str]

Use a service call to retrieve the components currently loaded into this composer and their IDs.

Returns

dict[int, str]

A dict mapping component IDs to full node names.

classmethod is_composer(node: AbstractNode, timeout: float = 0.0) bool

Checks whether a node provides services for loading components.

For a node to be a composer, it must be running, be registered with ROS and offer the ROS composition services. This method only checks whether one of the key services is present.

If a timeout is specified, the check will be repeated until it succeeds or the specified amount of time has passed. This is to ensure that a freshly started node had enough time to create its topics, especially on slower devices.

Parameters

nodeAbstractNode

The node object to check.

timeoutfloat

How long to wait at most for the composition services to appear. Wait forever if None.

Returns

bool

True if the node supports loading components, False otherwise.

property is_lifecycle: bool

Composers are not lifecycle nodes.

property is_running: bool

True if the node is currently running.

join(timeout: float | None = None) None

Join this node and return once it is shut down. Return immediately if it is not running.

Parameters

timeoutfloat, optional

How long to wait in seconds. Wait forever if None.

Raises

TimeoutError

If a timeout was set and the node is still running by the time it expires.

property language: str

The implementation language of this composer, usually cpp or py. Corresponds to this composer’s package (e.g. rclcpp_composition), but will be None if it’s a custom implementation.

load_component(component: Component, use_intra_process_comms: bool = True, **composer_extra_params: dict) int

Load this component into its composer.

Additional keyword arguments will be passed as ROS parameters to the component. If the component is not associated with this composer yet, a warning will be logged and its association will be updated.

Note that since components are loaded via a service call that there are some additional restrictions on the types of Component.params() and composer_extra_params. In particular, they must be compatible with the ROS2 Parameter message type.

Also note that new nodes created by a component are using the composer’s remaps. See the related issue in rclcpp.

Parameters

component: Component

The component to load.

use_intra_process_commsbool, optional

If True, the component will use intra process communication for exchanging messages with other components within the same composer.

Returns

int

The ID the component got assigned by ROS.

Raises

ValueError

If this composer is not running, if the component is already loaded, or if the parameters could not be serialized.

RuntimeError

If loading the component failed.

property managed_components: list[Component]

The components that were explicitly loaded through this composer instance. This will not contain components that have been loaded via external service calls.

shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float | None = None) None

This will shutdown the composer node. Unloading any loaded components is left to the actual composer implementation.

Parameters

reasonstr

A human-readable string describing why the composer and its components are being shutdown.

signumint, optional

The signal that should be send to the composer.

timeoutfloat, optional

How long to wait for each component and the composer to shutdown before returning. Don’t wait if timeout is 0.0. Wait forever if timeout is None.

Raises

TimeoutError

If a timeout > 0 was set and any of the components or the composer did not shutdown before then.

start(service_timeout: float = 5.0) None

Start this node. Once this succeeds, is_running() will return True.

Parameters

service_timeoutfloat, optional

How long to wait for each composition service to appear (3 total). Wait forever if set negative. Don’t check at all if None or 0.

unload_component(component: Component | int, timeout: float | None = None) None

Unload the specified component, essentially stopping its node.

Note that an unload request will be issued even if the component reports it is not loaded.

Parameters

componentComponent | int

The component or a component’s ID to stop.

timeoutfloat, optional

How long to wait for the component to be unloaded before returning. Don’t wait if timeout is 0.0. Wait forever if timeout is None.

Returns

bool

True if unloading the component succeeded, False otherwise.

Raises

ValueError

If this composer is not running.

KeyError

If the provided component has not been loaded into this node