better_launch.elements.composer moduleο
- class better_launch.elements.composer.Component(composer: Composer, package: str, plugin: str, name: str, namespace: str, *, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen', remaps: dict[str, str] | None = None, params: str | dict[str, Any] | None = None)ο
Bases:
AbstractNode
,LiveParamsMixin
- property component_id: intο
The ID this component got assigned when it was loaded into the composer. Will be None if the component is not loaded.
- property is_loaded: boolο
True if the component is loaded, False otherwise.
- property is_running: boolο
True if the node is currently running.
- join(timeout: float | None = None) None ο
Join this node and return once it is shut down. Return immediately if it is not running.
Parametersο
- timeoutfloat, optional
How long to wait in seconds. Wait forever if None.
Raisesο
- TimeoutError
If a timeout was set and the node is still running by the time it expires.
- property plugin: strο
The special string that is used for loading the component.
executable()
will return the same.
- shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = 0.0) None ο
Unload this component if it was loaded.
Parametersο
- reasonstr
A human-readable string describing why the component is being unloaded.
- signumint, optional
Ignored for components.
- start(use_intra_process_comms: bool = True, **composer_extra_params) None ο
Load this component into its composer.
Additional keyword arguments will be passed as ROS parameters to the component.
Parametersο
- use_intra_process_commsbool, optional
If True, the component will use intra process communication for exchanging messages with other components within the same composer.
- class better_launch.elements.composer.Composer(wrapped_node: AbstractNode, *, component_remaps: dict[str, str] | None = None, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen')ο
Bases:
AbstractNode
- get_live_components() dict[int, str] ο
Use a service call to retrieve the components currently loaded into this composer and their IDs.
Returnsο
- dict[int, str]
A dict mapping component IDs to full node names.
- classmethod is_composer(node: AbstractNode, timeout: float = 0.0) bool ο
Checks whether a node provides services for loading components.
For a node to be a composer, it must be running, be registered with ROS and offer the ROS composition services. This method only checks whether one of the key services is present.
If a timeout is specified, the check will be repeated until it succeeds or the specified amount of time has passed. This is to ensure that a freshly started node had enough time to create its topics, especially on slower devices.
Parametersο
- nodeAbstractNode
The node object to check.
- timeoutfloat
How long to wait at most for the composition services to appear. Wait forever if None.
Returnsο
- bool
True if the node supports loading components, False otherwise.
- property is_lifecycle: boolο
Composers are not lifecycle nodes.
- property is_running: boolο
True if the node is currently running.
- join(timeout: float | None = None) None ο
Join this node and return once it is shut down. Return immediately if it is not running.
Parametersο
- timeoutfloat, optional
How long to wait in seconds. Wait forever if None.
Raisesο
- TimeoutError
If a timeout was set and the node is still running by the time it expires.
- property language: strο
The implementation language of this composer, usually cpp or py. Corresponds to this composerβs package (e.g. rclcpp_composition), but will be None if itβs a custom implementation.
- load_component(component: Component, use_intra_process_comms: bool = True, **composer_extra_params: dict) int ο
Load this component into its composer.
Additional keyword arguments will be passed as ROS parameters to the component. If the component is not associated with this composer yet, a warning will be logged and its association will be updated.
Note that since components are loaded via a service call that there are some additional restrictions on the types of
Component.params()
and composer_extra_params. In particular, they must be compatible with the ROS2 Parameter message type.Also note that new nodes created by a component are using the composerβs remaps. See the related issue in rclcpp.
Parametersο
- component: Component
The component to load.
- use_intra_process_commsbool, optional
If True, the component will use intra process communication for exchanging messages with other components within the same composer.
Returnsο
- int
The ID the component got assigned by ROS.
Raisesο
- ValueError
If this composer is not running, if the component is already loaded, or if the parameters could not be serialized.
- RuntimeError
If loading the component failed.
- property managed_components: list[Component]ο
The components that were explicitly loaded through this composer instance. This will not contain components that have been loaded via external service calls.
- shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float | None = None) None ο
This will shutdown the composer node. Unloading any loaded components is left to the actual composer implementation.
Parametersο
- reasonstr
A human-readable string describing why the composer and its components are being shutdown.
- signumint, optional
The signal that should be send to the composer.
- timeoutfloat, optional
How long to wait for each component and the composer to shutdown before returning. Donβt wait if timeout is 0.0. Wait forever if timeout is None.
Raisesο
- TimeoutError
If a timeout > 0 was set and any of the components or the composer did not shutdown before then.
- start(service_timeout: float = 5.0) None ο
Start this node. Once this succeeds,
is_running()
will return True.Parametersο
- service_timeoutfloat, optional
How long to wait for each composition service to appear (3 total). Wait forever if set negative. Donβt check at all if None or 0.
- unload_component(component: Component | int, timeout: float | None = None) None ο
Unload the specified component, essentially stopping its node.
Note that an unload request will be issued even if the component reports it is not loaded.
Parametersο
- componentComponent | int
The component or a componentβs ID to stop.
- timeoutfloat, optional
How long to wait for the component to be unloaded before returning. Donβt wait if timeout is 0.0. Wait forever if timeout is None.
Returnsο
- bool
True if unloading the component succeeded, False otherwise.
Raisesο
- ValueError
If this composer is not running.
- KeyError
If the provided component has not been loaded into this node