better_launch.launcher module

class better_launch.launcher.BetterLaunch(*args, **kwargs)

Bases: object

This should be all you need to create beautiful, simple and convenient launch files!

action_client(topic: str, action_type: str | type, timeout: float = 5.0, qos_profile: rclpy.qos.QoSProfile = None) RosActionClient

Create a ROS2 action client to execute long-running actions.

Parameters

topicstr

The topic namespace on which the action interface is provided.

action_typestr | type

The type of actions the action server handles. Strings must follow the pattern <package>/action/<message>.

timeoutfloat, optional

Time to wait for the action server to become available. Ignored if <= 0.

qos_profileQoSProfile, optional

A quality of service profile that changes how the action server handles connections and retains data.

Returns

RosActionClient

The action client object. Although not required for Jazzy and below, it is recommended to keep a reference.

Raises

TimeoutError

If the action server did not become available within the specified timeout.

action_server(topic: str, action_type: str | type, callback: Callable[[Any], Any], qos_profile: rclpy.qos.QoSProfile = None) RosActionServer

Create a ROS2 action server using the [shared_node][BetterLaunch.shared_node].

Parameters

topicstr

The topic namespace to provide the action interface on.

action_typestr | type

The type of the actions to be handled. Strings must follow the pattern <package>/action/<message>.

callbackCallable[[Any], Any]

A function that will handle incoming action requests. The type of the requests will be of type rclpy.action.server.ServerGoalHandle and contain an action_type.Goal.

qos_profileQoSProfile, optional

A quality of service profile that changes how the action server handles connections and retains data.

Returns

RosActionServer

The action server object. Although not required for Jazzy and below, it is recommended to keep a reference.

add_shutdown_callback(callback: Callable[[], Any]) None

Adds a callback which will be called when better_launch shuts down.

Parameters

callbackCallable

The callback to call on shutdown.

all_ros2_node_names() list[str]

Returns a list of all currently registered node’s full names (namespace + name).

This list is guaranteed to be complete as far as ROS2 is concerned. If you require a node object you can actually interact with consider using [query_node][] or [get_nodes][] instead.

Returns

list[str]

A list of all running nodes’ full names.

call_service(topic: str, service_type: str | type, request_args: dict[str, Any] | Any = None, *, timeout: float = 5.0, qos_profile: rclpy.qos.QoSProfile = None, call_async: bool = False) Any

Makes a single service request and returns the result. The client is destroyed once the request has been handled. If you plan to make additional requests, use [service_client][] instead.

Parameters

topicstr

The service topic to post requests on.

service_typestr | type

The service’s message type. Strings must follow the pattern <package>/srv/<message>.

request_argsdict[str, Any] | Any

The keyword arguments from which the request message will be constructed. May also be an instance of service_type.Request.

timeoutfloat, optional

Time to wait for the service to become available. Ignored if <= 0.

qos_profileQoSProfile, optional

A quality of service profile that changes how the service handles connections.

call_asyncbool, optional

If True, make the service call async and return a rclpy.task.Future instead.

Returns

Any

A rclpy.task.Future if call_async is True, otherwise the result of the service call of type service_type.Request.

Raises

TimeoutError

If the service did not become available within the specified timeout.

component(package: str, plugin: str, name: str = None, *, remaps: dict[str, str] = None, params: str | dict[str, Any] = None, anonymous: bool = False, hidden: bool = False, use_intra_process_comms: bool = True, ros_waittime: float = 3.0, lifecycle_waittime: float = 0.01, lifecycle_target: LifecycleStage | str = LifecycleStage.ACTIVE, output: LogSink | Iterable[LogSink] | Iterable[str] | str = LogSink.SCREEN, **extra_composer_args: dict[str, Any]) Component

Create a component and load it into an existing [compose][] context.

If you instead want to load components without a compose context, you should instantiate [Component][] objects directly, then load them via [Component.start][] or [Composer.load_component][]. See the examples for more details.

Parameters

packagestr

The package providing the component implementation.

pluginstr

The name the component is registered as, typically of the form <package>::<Name>.

namestr, optional

The name the instantiated component should be known as. If None, a name will be derived from package and plugin, and anonymous will be set to True. Pass an empty string instead if you really want to use the node’s default name - just know that you’ll make a cute kitten really sad.

remapsdict[str, str], optional

Tells the node to replace any topics it wants to interact with according to the provided dict.

anonymousbool, optional

If True, the composer name will be appended with a unique suffix to avoid name conflicts.

hiddenbool, optional

If True, the composer name will be prepended with a “_”, hiding it from common listings.

paramsstr | dict[str, Any], optional

Any ROS parameters you want to pass to the component. These are the args you would typically have to declare in your launch file. A string will be interpreted as a path to a yaml file which will be lazy loaded using [BetterLaunch.load_params][].

use_intra_process_commsbool, optional

If True, ask the composer node to enable intra-process communication, i.e. share memory between components when passing messages instead of serializing and deserializing.

ros_waittimefloat, optional

How long to wait for the component to register with ROS. This should cover the time between the process starting and the component initializing itself. Set negative to wait indefinitely. Set to None to avoid the check entirely. Will do nothing if autostart_process is False.

lifecycle_waittimefloat, optional

How long to wait for the component’s lifecycle management to come up. This should cover the time between the component initializing itself (see ros_waittime) and creating its additional topics and services. While neglible on modern computers, slower devices and embedded systems may experience a noticable delay here. Set negative to wait indefinitely. Set to None to avoid the check entirely. Will do nothing if autostart_process is False.

lifecycle_targetLifecycleStage | str, optional

The lifecycle stage to bring the component into after starting. Has no effect if autostart_process is False or if the component does not appear to be a lifecycle component after waiting ros_waittime + lifecycle_waittime.

outputLogSink | Iterable[LogSink] | Iterable[str] | str, optional

Determines if and where this node’s output should be directed. Common choices are screen to print to terminal, log to write to a common log file, own_log to write to a node-specific log file, and none to not write any output anywhere. See [configure_logger][utils.better_logging.configure_logger] for details.

Returns

Component

The component that has been loaded into the current [compose][] context.

Raises

RuntimeError

If this is called outside a [compose][] context.

compose(name: str = None, language: str = 'cpp', variant: Literal['normal', 'multithreading', 'isolated'] = 'normal', *, reuse_existing: bool = True, component_remaps: dict[str, str] = None, anonymous: bool = False, hidden: bool = False, autostart_process: bool = True, ros_waittime: float = 3.0, output: LogSink | Iterable[LogSink] | Iterable[str] | str = LogSink.SCREEN) Generator[Composer, None, None]

Creates a composer node which can be used to load [Component][]s. Components can be instantiated directly, or preferably via [component][]. Only components can reside within a composer.

Existing composers can be reused even if they have been created outside of better_launch. See [Composer][] for further details.

This method should be used as a context, e.g.

bl = BetterLaunch()
with bl.compose("my-composer"):
    bl.component("my_package", "mystuff:TheComponentOfDreams", "normal-component")

Note that new nodes created by a component are using the composer’s remaps. This for example applies to transform listeners (but not transform publishers). See the related issue in rclcpp.

Parameters

namestr, optional

The name you want the composer to be known as. anonymous will be set to True if no name is provided. Pass an empty string if you really want to use the composer node’s default name.

languagestr, optional

The implementation of the standard composer you want to use. Ignored if reuse_existing is True and a matching node is found.

variantLiteral[“normal”, “multithreading”, “isolated”], optional

ROS2 provides special composers for components that need multithreading or should be isolated from the rest. Ignored if reuse_existing is True and a matching node is found.

reuse_existingbool, optional

If True and a node matching the current namespace and provided name is found, it will be used instead of creating a new node. This will even work for composers not created through better_launch, although in that case it won’t be possible to stop them.

component_remapsdict[str, str], optional

Any remaps you want to apply to all components loaded into this composer.

anonymousbool, optional

If True, the composer name will be appended with a unique suffix to avoid name conflicts. reuse_existing will be set to False in this case.

hiddenbool, optional

If True, the composer name will be prepended with a “_”, hiding it from common listings.

autostart_processbool, optional

If True, start the composer process before returning from this function. Note that setting this to False for a composer will make it unusable as a context object, since you won’t be able to load any components.

ros_waittimefloat, optional

How long to wait for the composer to register with ROS. This should cover the time between the process starting and the composer initializing itself. Set negative to wait indefinitely. Will do nothing if autostart_process is False.

outputLogSink | Iterable[LogSink] | Iterable[str] | str, optional

Determines if and where this node’s output should be directed. Common choices are screen to print to terminal, log to write to a common log file, own_log to write to a node-specific log file, and none to not write any output anywhere. See [configure_logger][utils.better_logging.configure_logger] for details.

Yields

Generator[Composer, None, None]

Sets the composition flag and yields the composer.

Raises

RuntimeError

If you try to create a composer within a [compose][] context.

classmethod exec(cmd: str | list[str]) str

Run the specified command and return its output. The command can either be an absolute path to an executable or any file found on PATH.

Parameters

cmdstr | list[str]

The command to run. If a string is passed it will be split on spaces to separate the command from its arguments. Pass a list instead to have more control over which arguments to treat as a single argument.

Returns

str

The output of the command without the trailing newline.

Raises

subprocess.CalledProcessError

If the command had a non-zero exit code. See the raised error’s returncode and output attributes for details.

find(package: str = None, filename: str = None, subdir: str = '**') str

Resolve a path to a file or package.

If the filename is absolute, all other arguments will be ignored and the filename will be returned.

If package is provided, the corresponding ROS2 package path will be used as the base path. Else we attempt to locate the current launch file’s package by searching its directory and parent directories for a package.xml. If the package cannot be determined the current working dir is used as the base path.

If neither subdir nor filename is provided the base path will be returned.

If filename is provided but subdir is not, the base path will be searched recursively for the given filename. Otherwise, subdir will be used to locate valid candidate files and directories within the base path, allowing patterns like **/lib/ (any lib folder) and *.py (any python file).

If only subdir is provided but not filename, the first candidate is returned. Otherwise the discovered candidates will be searched for the given filename.

Parameters

packagestr, optional

Name of a ROS2 package to resolve.

filenamestr, optional

Name of a file to look for.

subdirstr, optional

A glob pattern to locate subdirectories and files. See the pathlib pattern language for details.

Returns

str

A resolved path.

Raises

ValueError

If package contains path separators, or if a filename is provided but could not be found within base path.

find_group_for_namespace(namespace: str, create: bool = False) Group

Find the group representing the passed namespace.

Parameters

namespacestr

The namespace in question.

createbool

If True, create missing groups along the way.

Returns

Group

The group representing the final segment of the namespace, or None if no such group exists and create == False.

get_foreign_nodes() list[ForeignNode]

Lists all running nodes that have a process but have not been started by this better_launch process.

Note however that if a process starts multiple nodes, only the first node can be discovered. This is because ROS2 does not provide an API for getting the process parameters from a node.

Returns

list[ForeignNode]

All nodes with a process that have not been started by this better_launch process.

get_groups() list[Group]

Returns a list of all in the order they were created.

Returns

list[Group]

All groups added so far.

get_nodes(*, include_components: bool = False, include_launch_service: bool = True, include_foreign: bool = False) list[AbstractNode]

Returns a list of all nodes in the order they were added. Components will be added right after their composers.

Note that this will only return nodes that can be managed by better_launch. If a node process creates multiple nodes only the first node can be discovered, as ROS2 does not provide an API linking a node to its process (or even just its package).

Parameters

include_componentsbool, optional

Whether to include [Component][] instances. This will not include components that have been loaded from outside (e.g. ros2 component load).

include_launch_servicebool, optional

Whether to include the ROS2 launch service wrapper if it was created. Will be included after the regular nodes and before the foreign nodes.

include_foreignbool, optional

Whether to include foreign nodes that have not been started by this launcher instance. Will be appended at the end of the returned nodes. However, take heed of the above warning regarding node discovery.

Returns

list[AbstractNode]

A list of all nodes, sorted by when they were added.

get_ros_message_type(message_string: str) type

Loads a ROS2 message type from a string representation.

Message representations must follow the pattern <package>/<type>/<message>, where * <package> is the ROS2 package that defines the message. * <type> is the type of message, typically one of msg, srv or action. * <message> is the name of the message itself with proper capitalization.

Parameters

message_stringstr

A message representation of the form <package>/<type>/<message>.

Returns

type

The message class.

Raises

ImportError

If the message type could not be imported.

get_unique_name(name: str = '', check_running_nodes: bool = True) str

Returns a unique name. If a name is provided it will be prepended with an underscore.

Parameters

namestr, optional

The string to use as the base.

check_running_nodesbool, optional

If true, check the currently running ROS2 nodes for name collisions.

Returns

str

A unique name.

group(namespace: str) Generator[Group, None, None]

Groups are used to bundle nodes into namespaces. While they influence the nodes’ topics and service name, they have no runtime functionality.

Groups are intended to be used as context objects and can be nested. Note that starting a new root branch (i.e. a group starting with “/”) is valid and will change subsequent groups for the duration of the context window.

bl = BetterLaunch()
with bl.group("outer"):
    # Unless included in another group, "outer" will be attached to "/"
    with bl.group("inner/sanctum"):
        # Regular nesting, nodes will live within "/outer/inner/sanctum"
        bl.node(...)
    with bl.group("/evil/tower"):
        # New root branch, nodes will live within "/evil/tower"
        bl.node(...)
    # Root branch exited, nodes will live within "/outer" once again
    bl.node(...)

See also

  • [group_root][]

  • [group_tip][]

Parameters

namespacestr

The group’s namespace.

Yields

Generator[Group, None, None]

Places the group on the group stack and yields it. Exiting the context will pop the group from the group stack.

Raises

RuntimeError

If the group is created within a compose context.

property group_root: Group

The root group (“/”).

property group_tip: Group

The most recent group.

hello() None

Prints our welcome message and some useful information. Note that this will not appear in the logs!

include(package: str, launchfile: str, subdir: str = None, *, pass_launch_func_args: bool = True, **kwargs) None

Include another launch file, resolving its path using [find][].

The file is first read into memory and checked. If it seems to be a better_launch launch file, it is executed immediately (using [exec]). The BetterLaunch instance and global context will be shared. Any arguments to [launch_this][] in the included launch file will be ignored.

If the file does not appear to be a better_launch launch file, it is assumed to be a regular ROS2 launch file. In this case a launch.actions.IncludeLaunchDescription instance is created and passed to [ros2_actions][].

Parameters

packagestr

The package containing the specified launch file. May be None (see [find][]).

launchfilestr

The name of a launch file to execute.

subdirstr, optional

A path fragment the launch file must be located in.

pass_launch_func_argsbool, optional

If True, all launch_args will be passed to the included launch file. Additional launch arguments can also be provided via kwargs.

Raises

ValueError

If the passed in search_args cannot be handled.

classmethod is_included() bool

Check if this is run from an included launchfile.

NOTE: this will only work when (indirectly) invoked from [launch_this][].

More specifically, this checks if a [BetterLaunch][] instance has been stored in the calling frame’s globals, which happens on the first instantiation. This mechanism is an implementation detail and should not be relied on.

Returns

bool

True if a launch_this function has already run.

Raises

ValueError

When launch_this is not part of the current stack frame.

property is_shutdown: bool

Whether better_launch has shutdown.

property launch_args: dict[str, Any]

All key-value pairs that have been passed to the launch function.

property launchfile: str

The path of the (main) better_launch launchfile being executed.

load_params(package: str = None, configfile: str = None, subdir: str = None, *, qualifier: str | Node = None) dict[str, Any]

Load parameters from a yaml file located through [find][].

If the config only contains a ros__parameters section the entire config is returned regardless of whether qualifier was passed. Otherwise, if qualifier is provided, the loaded config dict is searched for a matching section. If no matching section can be found a ValueError will be raised.

The following wildcards are supported for parameter sections: * /**: matches any number of tokens, may be followed by additional tokens and a node name * /*: skips a single namespace token, or ignores the node’s name if at the end

Note that better_launch could not care less whether you put ros__parameters in your configs - if it is there it will be silently discarded.

Parameters

packagestr

A package to search for the config file. May be None (see [find][]).

configfilestr

The name of the config file to locate.

subdirstr, optional

A path fragment that the config file must be located in.

qualifierstr | Node, optional

Used to specifiy which section of the config to return.

Returns

dict[str, Any]

The key-value pairs from the config.

Raises

ValueError

If the path cannot be resolved, if qualifier is supplied and no matching section could be found, or if a substitution failed.

IOError

If the config file could not be read.

log(severity: str | int, message: str, *args: list[Any], **kwargs: dict[str, Any]) None

Pass the specified message to the logger.

This mostly exists to provide this functionality for TOML launchfiles.

Parameters

severitystr | int

A logging severity or level. Standard severities are debug, info, warning, error, critical, and fatal. Integers can be used for more fine grained control and custom log levels, as per the python logging module.

messagestr

The message to log.

argslist[Any], optional

A sequence of additional arguments to format the message.

kwargsdict[str, Any], optional

A dict of additional arguments to format the message.

node(package: str, executable: str, name: str = None, *, remaps: dict[str, str] = None, params: str | dict[str, Any] = None, cmd_args: list[str] = None, env: dict[str, str] = None, isolate_env: bool = False, log_level: int = 20, output: LogSink | Iterable[LogSink] | Iterable[str] | str = LogSink.SCREEN, anonymous: bool = False, hidden: bool = False, on_exit: Callable = None, max_respawns: int = 0, respawn_delay: float = 0.0, use_shell: bool = False, autostart_process: bool = True, ros_waittime: float = 3.0, lifecycle_waittime: float = 0.01, lifecycle_target: LifecycleStage | str = LifecycleStage.ACTIVE, raw: bool = False, remap_qualifier: str = None, qualify_all_remaps: bool = False) Node

Create a new ROS2 node process. The bread and butter of every ROS setup!

Please note that by default better_launch will generate an anonymous node name if no node name was specified. This helps to avoid multiple nodes with the same name, which in ROS2 is both possible and problematic. If you really don’t want to specify a node name, pass an empty string instead.

This method also handles lifecycle nodes (they REALLY should have a common interface). Note that especially for lifecycle nodes you probably want autostart_process == True, otherwise there lifecycle management will not exist. With autostart_process == True, a lifecycle node will automatically advance to lifecycle_target once it is up. Otherwise you can also call [Node.start][better_launch.elements.abstract_node.AbstractNode.start] later.

The ROS2 documentation can provide some additional information regarding params, remaps, and so on.

Parameters

packagestr

The package providing the node.

executablestr

The executable that should be run.

namestr, optional

The name you want the node to be known as. If None, a name will be derived from package and executable and anonymous will be set to True. Pass an empty string instead if you really want to use the node’s default name - just know that you’ll make a cute kitten really sad.

remapsdict[str, str], optional

Tells the node to replace any topics it wants to interact with according to the provided dict.

paramsstr | dict[str, Any], optional

Any ROS parameters you want to pass to the node. These are the args you would typically have to declare in your launch file. A string will be interpreted as a path to a yaml file which will be lazy loaded using [BetterLaunch.load_params][].

cmd_argslist[str], optional

Additional command line arguments to pass to the node.

envdict[str, str], optional

Additional environment variables to set for the node’s process. The node process will merge these with the environment variables of the better_launch host process unless isolate_env is True.

isolate_envbool, optional

If True, the node process’ env will not be inherited from the parent process and only those passed via env will be used. Be aware that this can result in many common things to not work anymore since e.g. keys like PATH will be missing.

log_levelint, optional

The minimum severity a logged message from this node must have in order to be published. This will be added to the cmd_args unless it is None.

outputLogSink | Iterable[LogSink] | Iterable[str] | str, optional

Determines if and where this node’s output should be directed. Common choices are screen to print to terminal, log to write to a common log file, own_log to write to a node-specific log file, and none to not write any output anywhere. See [configure_logger][utils.better_logging.configure_logger] for details.

anonymousbool, optional

If True, the node name will be appended with a unique suffix to avoid name conflicts.

hiddenbool, optional

If True, the node name will be prepended with a “_”, hiding it from common listings.

on_exitCallable, optional

A function to call when the node’s process terminates (after any possible respawns).

max_respawnsint, optional

How often to restart the node process if it terminates.

respawn_delayfloat, optional

How long to wait before restarting the node process after it terminates.

use_shellbool, optional

If True, invoke the node executable via the system shell. While this gives access to the shell’s builtins, this has the downside of running the node inside a “mystery program” which is platform and user dependent. Generally not advised.

autostart_processbool, optional

If True, start the node process before returning from this function.

ros_waittimefloat, optional

How long to wait for the node to register with ROS. This should cover the time between the process starting and the node initializing itself. Set negative to wait indefinitely. Set to None to avoid the check entirely. Will do nothing if autostart_process is False.

lifecycle_waittimefloat, optional

How long to wait for the node’s lifecycle management to come up. This should cover the time between the node initializing itself (see ros_waittime) and creating its additional topics and services. While neglible on modern computers, slower devices and embedded systems may experience a noticable delay here. Set negative to wait indefinitely. Set to None to avoid the check entirely. Will do nothing if autostart_process is False.

lifecycle_targetLifecycleStage | str, optional

The lifecycle stage to bring the node into after starting. Has no effect if autostart_process is False or if the node does not appear to be a lifecycle node after waiting ros_waittime + lifecycle_waittime.

rawbool, optional

If True, don’t treat the executable as a ROS2 node and avoid passing it any command line arguments except those specified.

remap_qualifierstr, optional

Additional qualifier that will precede the node’s __ns and __name remap rules. Should be the original name of the node (i.e. whatever its default name is) and can be qualified with a namespace. Useful to prevent multiple nodes with the same name when a process can have more than one node (e.g. controller_manager). See [this ROS2 design doc](https://design.ros2.org/articles/static_remapping.html#how-the-syntax-works) for more information.

qualify_all_remapsbool, optional

If True, apply the remap_qualifier to all remaps that are not already qualified.

Returns

Node

The node object wrapping the node process.

Raises

RuntimeError

If you try to add a node withing a [compose][] context.

publish_message(topic: str, message_type: str | type, message_args: dict[str, Any], qos_profile: QoSProfile | int = 10, *, time_to_publish: float = 1.0) None

Convenience method to publish a single message. The publisher will be destroyed once the message has been published. If you plan to publish additional messages, use [publisher][] instead and use the instance.

Parameters

topicstr

The topic to publish messages on.

message_typestr | type

The message type that will be published. Strings must follow the pattern <package>/msg/<message>.

message_argsdict[str, Any]

The keyword arguments from which the message will be constructed.

qos_profileQoSProfile | int, optional

A quality of service profile that changes how the publisher handles connections and retains data.

time_to_publish: float, optional

How long to give the publisher to submit the message. Expect the message to get lost if you set this to 0.

publisher(topic: str, message_type: str | type, qos_profile: QoSProfile | int = 10) rclpy.node.Publisher

Create a ROS2 publisher using the [shared_node][].

Parameters

topicstr

The topic to publish messages on.

message_typestr | type

The message type that will be published. Strings must follow the pattern <package>/msg/<message>.

qos_profileQoSProfile | int, optional

A quality of service profile that changes how the publisher handles connections and retains data.

Returns

RosPublisher

The publisher object. Although not required for Jazzy and below, it is recommended to keep a reference.

query_node(pattern: str, *, include_components: bool = True, include_launch_service: bool = False, include_foreign: bool = False) AbstractNode

Retrieve the first node matching the provided pattern.

Parameters

patternstr

Either the name of a node, or a qualified node name (i.e. namespace + name). If a namespace is included it must be absolute, but may include * or ** wildcards to skip one or more groups (via fnmatch).

include_componentsbool, optional

Whether to include components in the results, if any.

include_launch_servicebool, optional

Whether to include the ROS2 launch service in the result (if it exists).

include_foreignbool, optional

Whether to include foreign nodes not created by this launcher.

Returns

AbstractNode

The first node matching the provided pattern, or None if none matched.

query_nodes(pattern: str, *, include_components: bool = True, include_launch_service: bool = False, include_foreign: bool = False) Generator[AbstractNode, None, None]

Yield all nodes matching the provided pattern.

Parameters

patternstr

Either the name of a node, or a qualified node name (i.e. namespace + name). If a namespace is included it must be absolute, but may include * or ** wildcards to skip one or more groups (via fnmatch).

include_componentsbool, optional

Whether to include components in the results, if any.

include_launch_servicebool, optional

Whether to include the ROS2 launch service in the result (if it exists).

include_foreignbool, optional

Whether to include foreign nodes not created by this launcher.

Returns

Generator[AbstractNode, None, None]

The nodes matching the pattern.

ros2_actions(*ros2_actions) Ros2LaunchWrapper

Submit additional ROS2 launch actions for execution.

If no launch.LaunchService exists yet it will be created and started immediately.

ros2_launch_service(name: str = 'LaunchService', launchservice_args: list[str] = None, output: LogSink | Iterable[LogSink] | Iterable[str] | str = LogSink.SCREEN, start_immediately: bool = True) Ros2LaunchWrapper

Create or retrieve a manager object that can be used for queueing ROS2 launch actions.

Usually, calling [ros2_actions][] is more convenient for queueing actions. However, calling this first allows to prevent starting the underlying launch.LaunchService immediately, giving more control over when the actions are executed.

Since the LaunchService insists on running on the main thread it will be started as a sub process.

Note that only one instance of the ROS2 wrapper should ever exist. Calling this method after it has been created will return the already existing instance instead. Any passed arguments will be silently discarded.

Parameters

namestr, optional

The name used to identify the process and its logger.

launchservice_argslist[str], optional

Additional launch arguments to pass to the ROS2 launch service. These will end up in launch.LaunchContext.argv.

outputLogSink | Iterable[LogSink] | Iterable[str] | str, optional

How log output from the launch service should be handled. This will also include the output from all nodes launched by this launch service. Common choices are screen to print to terminal, log to write to a common log file, own_log to write to a node-specific log file, and none to not write any output anywhere. See [configure_logger][utils.better_logging.configure_logger] for details.

start_immediatelybool, optional

If True, the ROS2 launch service process is started immediately.

Returns

Ros2LaunchWrapper

The wrapper hosting the ROS2 launch service process.

property ros_adapter: ROSAdapter

Contains and runs a shared ROS2 node to interact with topics, services, etc. The adapter is instantiated lazily as it brings a major performance hit (about 6 MiB memory and a high CPU spike).

static ros_distro() str

Returns the name of the currently sourced ros distro (i.e. $ROS_DISTRO).

run_later(delay: float, callback: Callable, *args, **kwargs) Future

Convenience method for running a callback with a delay. The callback will be called on a separte thread.

This mainly exists to cover the use case where you want to interact with ROS from an rclpy.Timer. A synchronous call from within a timer (e.g. a service call like [Node.set_live_params][]) will block ROS’ background event loop, preventing publishers, subscribers, services, etc. from doing their work. It will also prevent a clean shutdown as ROS usually waits for the event queue to become empty.

When executing a long running task this way it is a good idea to check [is_shutdown][] in between iterations.

Parameters

delayfloat

How long to wait in seconds before calling the callback.

callbackCallable

The function to call after the timeout. Will not be called if [shutdown][] is called beforehand.

*argsAny, optional

Positional arguments to the callback.

**kwargsAny, optional

Keyword arguments to the callback.

Returns

Future

A future which will be cancelled on shutdown, otherwise it will hold the callback’s result or an exception the callback raised.

service(topic: str, service_type: str | type, callback: Callable[[Any], Any], qos_profile: rclpy.qos.QoSProfile = None) rclpy.node.Service

Create a ROS2 service provider using the [shared_node][].

Parameters

topicstr

The topic the service will live on.

service_typestr | type

The service’s message type. Strings must follow the pattern <package>/srv/<message>.

callbackCallable[[Any], Any]

The function that will handle any requests to the service. The type of the request will be of type service_type.Request.

qos_profileQoSProfile, optional

A quality of service profile that changes how the service handles connections.

Returns

RosServiceProvider

The service object. Although not required for Jazzy and below, it is recommended to keep a reference.

service_client(topic: str, service_type: str | type, timeout: float = 5.0, qos_profile: rclpy.qos.QoSProfile = None) rclpy.node.Client

Create a ROS2 service client that can be used to call a service.

Parameters

topicstr

The service topic to post requests on.

service_typestr | type

The service’s message type. Strings must follow the pattern <package>/srv/<message>.

timeoutfloat, optional

Time to wait for the service to become available. Ignored if <= 0.

qos_profileQoSProfile, optional

A quality of service profile that changes how the service handles connections.

Returns

RosServiceClient

The client object. Although not required for Jazzy and below, it is recommended to keep a reference.

Raises

TimeoutError

If the service did not become available within the specified timeout.

property shared_node: rclpy.node.Node

A ROS2 node instance that can be used for creating publishers, services, etc.

shutdown(reason: str = None, signum: int = Signals.SIGTERM) None

Ask all nodes to shutdown and terminate the internal ROS2 thread. Any subsequent calls to BetterLaunch member functions, including this one, may fail. This will typically be called when you want to terminate your launch file.

Parameters

reasonstr, optional

A human-readable string explaining the reason for the shutdown. If not given this will be requitted with a warning.

signumint, optional

The signal to send to child processes.

sleep(seconds: float) None

Wait for the specified amount of time.

This mostly exists to provide this functionality for TOML launchfiles.

Parameters

secondsfloat

How many seconds to wait.

spin(exit_with_last_node: bool = True) None

Join the BetterLaunch thread until it terminates. You do not need to call this if you’re using the [launch_this][] wrapper or the TUI.

Parameters

exit_with_last_nodebool, optional

If True this function will return when all nodes have been stopped.

subscriber(topic: str, message_type: type, callback: Callable[[Any], None], qos_profile: QoSProfile | int = 10) rclpy.node.Subscription

Create a ROS2 subscriber to receive messages.

Parameters

topicstr

The topic to listen on for messages.

message_typestr | type

The type of the messages that will be received. Strings must follow the pattern <package>/msg/<message>.

callbackCallable[[Any], Any]

A function that will be called whenever a message is received.

qos_profileQoSProfile | int, optional

A quality of service profile that changes how the publisher handles connections and retains data.

Returns

RosSubscriber

The subscriber object. Although not required for Jazzy and below, it is recommended to keep a reference.

wait_for_service(service: str, timeout: float = None) bool

Wait for the specified service to appear.

Parameters

servicestr

The full path of the service to wait for.

timeoutfloat, optional

How long to wait for the service. Wait forever if None.

Returns

bool

True if the service appeared within the timeout, False otherwise.

wait_for_topic(topic: str, timeout: float = None) bool

Wait for the specified topic to appear.

Parameters

topicstr

The full path of the topic to wait for.

timeoutfloat, optional

How long to wait for the topic. Wait forever if None.

Returns

bool

True if the topic appeared within the timeout, False otherwise.

class better_launch.launcher.BetterLaunchMeta

Bases: type

instance() BetterLaunch

Immediately retrieve the BetterLaunch singleton instance.

Returns

BetterLaunch

The BetterLaunch singleton instance, or None if it doesn’t exist yet.

wait_for_instance(timeout: float = None) BetterLaunch

Retrieve the BetterLaunch singleton instance as soon as possible.

Parameters

timeoutfloat, optional

How long to wait for the singleton instance to appear. Wait forever if timeout is None. Don’t wait at all if timeout is 0.0.

Returns

BetterLaunch

The BetterLaunch singleton instance.

Raises

TimeoutError

If the timeout has passed and no instance has appeared yet.