better_launch.launcher module
- class better_launch.launcher.BetterLaunch(*args, **kwargs)
Bases:
objectThis should be all you need to create beautiful, simple and convenient launch files!
- action_client(topic: str, action_type: str | type, timeout: float = 5.0, qos_profile: rclpy.qos.QoSProfile = None) RosActionClient
Create a ROS2 action client to execute long-running actions.
Parameters
- topicstr
The topic namespace on which the action interface is provided.
- action_typestr | type
The type of actions the action server handles. Strings must follow the pattern <package>/action/<message>.
- timeoutfloat, optional
Time to wait for the action server to become available. Ignored if <= 0.
- qos_profileQoSProfile, optional
A quality of service profile that changes how the action server handles connections and retains data.
Returns
- RosActionClient
The action client object. Although not required for Jazzy and below, it is recommended to keep a reference.
Raises
- TimeoutError
If the action server did not become available within the specified timeout.
- action_server(topic: str, action_type: str | type, callback: Callable[[Any], Any], qos_profile: rclpy.qos.QoSProfile = None) RosActionServer
Create a ROS2 action server using the [shared_node][BetterLaunch.shared_node].
Parameters
- topicstr
The topic namespace to provide the action interface on.
- action_typestr | type
The type of the actions to be handled. Strings must follow the pattern <package>/action/<message>.
- callbackCallable[[Any], Any]
A function that will handle incoming action requests. The type of the requests will be of type rclpy.action.server.ServerGoalHandle and contain an action_type.Goal.
- qos_profileQoSProfile, optional
A quality of service profile that changes how the action server handles connections and retains data.
Returns
- RosActionServer
The action server object. Although not required for Jazzy and below, it is recommended to keep a reference.
- add_shutdown_callback(callback: Callable[[], Any]) None
Adds a callback which will be called when better_launch shuts down.
Parameters
- callbackCallable
The callback to call on shutdown.
- all_ros2_node_names() list[str]
Returns a list of all currently registered node’s full names (namespace + name).
This list is guaranteed to be complete as far as ROS2 is concerned. If you require a node object you can actually interact with consider using [query_node][] or [get_nodes][] instead.
Returns
- list[str]
A list of all running nodes’ full names.
- call_service(topic: str, service_type: str | type, request_args: dict[str, Any] | Any = None, *, timeout: float = 5.0, qos_profile: rclpy.qos.QoSProfile = None, call_async: bool = False) Any
Makes a single service request and returns the result. The client is destroyed once the request has been handled. If you plan to make additional requests, use [service_client][] instead.
Parameters
- topicstr
The service topic to post requests on.
- service_typestr | type
The service’s message type. Strings must follow the pattern <package>/srv/<message>.
- request_argsdict[str, Any] | Any
The keyword arguments from which the request message will be constructed. May also be an instance of service_type.Request.
- timeoutfloat, optional
Time to wait for the service to become available. Ignored if <= 0.
- qos_profileQoSProfile, optional
A quality of service profile that changes how the service handles connections.
- call_asyncbool, optional
If True, make the service call async and return a rclpy.task.Future instead.
Returns
- Any
A rclpy.task.Future if call_async is True, otherwise the result of the service call of type service_type.Request.
Raises
- TimeoutError
If the service did not become available within the specified timeout.
- component(package: str, plugin: str, name: str = None, *, remaps: dict[str, str] = None, params: str | dict[str, Any] = None, anonymous: bool = False, hidden: bool = False, use_intra_process_comms: bool = True, ros_waittime: float = 3.0, lifecycle_waittime: float = 0.01, lifecycle_target: LifecycleStage | str = LifecycleStage.ACTIVE, output: LogSink | Iterable[LogSink] | Iterable[str] | str = LogSink.SCREEN, **extra_composer_args: dict[str, Any]) Component
Create a component and load it into an existing [compose][] context.
If you instead want to load components without a compose context, you should instantiate [Component][] objects directly, then load them via [Component.start][] or [Composer.load_component][]. See the examples for more details.
Parameters
- packagestr
The package providing the component implementation.
- pluginstr
The name the component is registered as, typically of the form <package>::<Name>.
- namestr, optional
The name the instantiated component should be known as. If None, a name will be derived from package and plugin, and anonymous will be set to True. Pass an empty string instead if you really want to use the node’s default name - just know that you’ll make a cute kitten really sad.
- remapsdict[str, str], optional
Tells the node to replace any topics it wants to interact with according to the provided dict.
- anonymousbool, optional
If True, the composer name will be appended with a unique suffix to avoid name conflicts.
- hiddenbool, optional
If True, the composer name will be prepended with a “_”, hiding it from common listings.
- paramsstr | dict[str, Any], optional
Any ROS parameters you want to pass to the component. These are the args you would typically have to declare in your launch file. A string will be interpreted as a path to a yaml file which will be lazy loaded using [BetterLaunch.load_params][].
- use_intra_process_commsbool, optional
If True, ask the composer node to enable intra-process communication, i.e. share memory between components when passing messages instead of serializing and deserializing.
- ros_waittimefloat, optional
How long to wait for the component to register with ROS. This should cover the time between the process starting and the component initializing itself. Set negative to wait indefinitely. Set to None to avoid the check entirely. Will do nothing if autostart_process is False.
- lifecycle_waittimefloat, optional
How long to wait for the component’s lifecycle management to come up. This should cover the time between the component initializing itself (see ros_waittime) and creating its additional topics and services. While neglible on modern computers, slower devices and embedded systems may experience a noticable delay here. Set negative to wait indefinitely. Set to None to avoid the check entirely. Will do nothing if autostart_process is False.
- lifecycle_targetLifecycleStage | str, optional
The lifecycle stage to bring the component into after starting. Has no effect if autostart_process is False or if the component does not appear to be a lifecycle component after waiting ros_waittime + lifecycle_waittime.
- outputLogSink | Iterable[LogSink] | Iterable[str] | str, optional
Determines if and where this node’s output should be directed. Common choices are screen to print to terminal, log to write to a common log file, own_log to write to a node-specific log file, and none to not write any output anywhere. See [configure_logger][utils.better_logging.configure_logger] for details.
Returns
- Component
The component that has been loaded into the current [compose][] context.
Raises
- RuntimeError
If this is called outside a [compose][] context.
- compose(name: str = None, language: str = 'cpp', variant: Literal['normal', 'multithreading', 'isolated'] = 'normal', *, reuse_existing: bool = True, component_remaps: dict[str, str] = None, anonymous: bool = False, hidden: bool = False, autostart_process: bool = True, ros_waittime: float = 3.0, output: LogSink | Iterable[LogSink] | Iterable[str] | str = LogSink.SCREEN) Generator[Composer, None, None]
Creates a composer node which can be used to load [Component][]s. Components can be instantiated directly, or preferably via [component][]. Only components can reside within a composer.
Existing composers can be reused even if they have been created outside of better_launch. See [Composer][] for further details.
This method should be used as a context, e.g.
bl = BetterLaunch() with bl.compose("my-composer"): bl.component("my_package", "mystuff:TheComponentOfDreams", "normal-component")
Note that new nodes created by a component are using the composer’s remaps. This for example applies to transform listeners (but not transform publishers). See the related issue in rclcpp.
Parameters
- namestr, optional
The name you want the composer to be known as. anonymous will be set to True if no name is provided. Pass an empty string if you really want to use the composer node’s default name.
- languagestr, optional
The implementation of the standard composer you want to use. Ignored if reuse_existing is True and a matching node is found.
- variantLiteral[“normal”, “multithreading”, “isolated”], optional
ROS2 provides special composers for components that need multithreading or should be isolated from the rest. Ignored if reuse_existing is True and a matching node is found.
- reuse_existingbool, optional
If True and a node matching the current namespace and provided name is found, it will be used instead of creating a new node. This will even work for composers not created through better_launch, although in that case it won’t be possible to stop them.
- component_remapsdict[str, str], optional
Any remaps you want to apply to all components loaded into this composer.
- anonymousbool, optional
If True, the composer name will be appended with a unique suffix to avoid name conflicts. reuse_existing will be set to False in this case.
- hiddenbool, optional
If True, the composer name will be prepended with a “_”, hiding it from common listings.
- autostart_processbool, optional
If True, start the composer process before returning from this function. Note that setting this to False for a composer will make it unusable as a context object, since you won’t be able to load any components.
- ros_waittimefloat, optional
How long to wait for the composer to register with ROS. This should cover the time between the process starting and the composer initializing itself. Set negative to wait indefinitely. Will do nothing if autostart_process is False.
- outputLogSink | Iterable[LogSink] | Iterable[str] | str, optional
Determines if and where this node’s output should be directed. Common choices are screen to print to terminal, log to write to a common log file, own_log to write to a node-specific log file, and none to not write any output anywhere. See [configure_logger][utils.better_logging.configure_logger] for details.
Yields
- Generator[Composer, None, None]
Sets the composition flag and yields the composer.
Raises
- RuntimeError
If you try to create a composer within a [compose][] context.
- classmethod exec(cmd: str | list[str]) str
Run the specified command and return its output. The command can either be an absolute path to an executable or any file found on PATH.
Parameters
- cmdstr | list[str]
The command to run. If a string is passed it will be split on spaces to separate the command from its arguments. Pass a list instead to have more control over which arguments to treat as a single argument.
Returns
- str
The output of the command without the trailing newline.
Raises
- subprocess.CalledProcessError
If the command had a non-zero exit code. See the raised error’s returncode and output attributes for details.
- find(package: str = None, filename: str = None, subdir: str = '**') str
Resolve a path to a file or package.
If the filename is absolute, all other arguments will be ignored and the filename will be returned.
If package is provided, the corresponding ROS2 package path will be used as the base path. Else we attempt to locate the current launch file’s package by searching its directory and parent directories for a package.xml. If the package cannot be determined the current working dir is used as the base path.
If neither subdir nor filename is provided the base path will be returned.
If filename is provided but subdir is not, the base path will be searched recursively for the given filename. Otherwise, subdir will be used to locate valid candidate files and directories within the base path, allowing patterns like **/lib/ (any lib folder) and *.py (any python file).
If only subdir is provided but not filename, the first candidate is returned. Otherwise the discovered candidates will be searched for the given filename.
Parameters
- packagestr, optional
Name of a ROS2 package to resolve.
- filenamestr, optional
Name of a file to look for.
- subdirstr, optional
A glob pattern to locate subdirectories and files. See the pathlib pattern language for details.
Returns
- str
A resolved path.
Raises
- ValueError
If package contains path separators, or if a filename is provided but could not be found within base path.
- find_group_for_namespace(namespace: str, create: bool = False) Group
Find the group representing the passed namespace.
Parameters
- namespacestr
The namespace in question.
- createbool
If True, create missing groups along the way.
Returns
- Group
The group representing the final segment of the namespace, or None if no such group exists and create == False.
- get_foreign_nodes() list[ForeignNode]
Lists all running nodes that have a process but have not been started by this better_launch process.
Note however that if a process starts multiple nodes, only the first node can be discovered. This is because ROS2 does not provide an API for getting the process parameters from a node.
Returns
- list[ForeignNode]
All nodes with a process that have not been started by this better_launch process.
- get_groups() list[Group]
Returns a list of all in the order they were created.
Returns
- list[Group]
All groups added so far.
- get_nodes(*, include_components: bool = False, include_launch_service: bool = True, include_foreign: bool = False) list[AbstractNode]
Returns a list of all nodes in the order they were added. Components will be added right after their composers.
Note that this will only return nodes that can be managed by better_launch. If a node process creates multiple nodes only the first node can be discovered, as ROS2 does not provide an API linking a node to its process (or even just its package).
Parameters
- include_componentsbool, optional
Whether to include [Component][] instances. This will not include components that have been loaded from outside (e.g. ros2 component load).
- include_launch_servicebool, optional
Whether to include the ROS2 launch service wrapper if it was created. Will be included after the regular nodes and before the foreign nodes.
- include_foreignbool, optional
Whether to include foreign nodes that have not been started by this launcher instance. Will be appended at the end of the returned nodes. However, take heed of the above warning regarding node discovery.
Returns
- list[AbstractNode]
A list of all nodes, sorted by when they were added.
- get_ros_message_type(message_string: str) type
Loads a ROS2 message type from a string representation.
Message representations must follow the pattern <package>/<type>/<message>, where * <package> is the ROS2 package that defines the message. * <type> is the type of message, typically one of msg, srv or action. * <message> is the name of the message itself with proper capitalization.
Parameters
- message_stringstr
A message representation of the form <package>/<type>/<message>.
Returns
- type
The message class.
Raises
- ImportError
If the message type could not be imported.
- get_unique_name(name: str = '', check_running_nodes: bool = True) str
Returns a unique name. If a name is provided it will be prepended with an underscore.
Parameters
- namestr, optional
The string to use as the base.
- check_running_nodesbool, optional
If true, check the currently running ROS2 nodes for name collisions.
Returns
- str
A unique name.
- group(namespace: str) Generator[Group, None, None]
Groups are used to bundle nodes into namespaces. While they influence the nodes’ topics and service name, they have no runtime functionality.
Groups are intended to be used as context objects and can be nested. Note that starting a new root branch (i.e. a group starting with “/”) is valid and will change subsequent groups for the duration of the context window.
bl = BetterLaunch() with bl.group("outer"): # Unless included in another group, "outer" will be attached to "/" with bl.group("inner/sanctum"): # Regular nesting, nodes will live within "/outer/inner/sanctum" bl.node(...) with bl.group("/evil/tower"): # New root branch, nodes will live within "/evil/tower" bl.node(...) # Root branch exited, nodes will live within "/outer" once again bl.node(...)
See also
[group_root][]
[group_tip][]
Parameters
- namespacestr
The group’s namespace.
Yields
- Generator[Group, None, None]
Places the group on the group stack and yields it. Exiting the context will pop the group from the group stack.
Raises
- RuntimeError
If the group is created within a compose context.
- hello() None
Prints our welcome message and some useful information. Note that this will not appear in the logs!
- include(package: str, launchfile: str, subdir: str = None, *, pass_launch_func_args: bool = True, **kwargs) None
Include another launch file, resolving its path using [find][].
The file is first read into memory and checked. If it seems to be a better_launch launch file, it is executed immediately (using [exec]). The BetterLaunch instance and global context will be shared. Any arguments to [launch_this][] in the included launch file will be ignored.
If the file does not appear to be a better_launch launch file, it is assumed to be a regular ROS2 launch file. In this case a launch.actions.IncludeLaunchDescription instance is created and passed to [ros2_actions][].
Parameters
- packagestr
The package containing the specified launch file. May be None (see [find][]).
- launchfilestr
The name of a launch file to execute.
- subdirstr, optional
A path fragment the launch file must be located in.
- pass_launch_func_argsbool, optional
If True, all launch_args will be passed to the included launch file. Additional launch arguments can also be provided via kwargs.
Raises
- ValueError
If the passed in search_args cannot be handled.
- classmethod is_included() bool
Check if this is run from an included launchfile.
NOTE: this will only work when (indirectly) invoked from [launch_this][].
More specifically, this checks if a [BetterLaunch][] instance has been stored in the calling frame’s globals, which happens on the first instantiation. This mechanism is an implementation detail and should not be relied on.
Returns
- bool
True if a launch_this function has already run.
Raises
- ValueError
When launch_this is not part of the current stack frame.
- property is_shutdown: bool
Whether better_launch has shutdown.
- property launch_args: dict[str, Any]
All key-value pairs that have been passed to the launch function.
- property launchfile: str
The path of the (main) better_launch launchfile being executed.
- load_params(package: str = None, configfile: str = None, subdir: str = None, *, qualifier: str | Node = None) dict[str, Any]
Load parameters from a yaml file located through [find][].
If the config only contains a ros__parameters section the entire config is returned regardless of whether qualifier was passed. Otherwise, if qualifier is provided, the loaded config dict is searched for a matching section. If no matching section can be found a ValueError will be raised.
The following wildcards are supported for parameter sections: * /**: matches any number of tokens, may be followed by additional tokens and a node name * /*: skips a single namespace token, or ignores the node’s name if at the end
Note that better_launch could not care less whether you put ros__parameters in your configs - if it is there it will be silently discarded.
See also
Parameters
- packagestr
A package to search for the config file. May be None (see [find][]).
- configfilestr
The name of the config file to locate.
- subdirstr, optional
A path fragment that the config file must be located in.
- qualifierstr | Node, optional
Used to specifiy which section of the config to return.
Returns
- dict[str, Any]
The key-value pairs from the config.
Raises
- ValueError
If the path cannot be resolved, if qualifier is supplied and no matching section could be found, or if a substitution failed.
- IOError
If the config file could not be read.
- log(severity: str | int, message: str, *args: list[Any], **kwargs: dict[str, Any]) None
Pass the specified message to the logger.
This mostly exists to provide this functionality for TOML launchfiles.
Parameters
- severitystr | int
A logging severity or level. Standard severities are debug, info, warning, error, critical, and fatal. Integers can be used for more fine grained control and custom log levels, as per the python logging module.
- messagestr
The message to log.
- argslist[Any], optional
A sequence of additional arguments to format the message.
- kwargsdict[str, Any], optional
A dict of additional arguments to format the message.
- node(package: str, executable: str, name: str = None, *, remaps: dict[str, str] = None, params: str | dict[str, Any] = None, cmd_args: list[str] = None, env: dict[str, str] = None, isolate_env: bool = False, log_level: int = 20, output: LogSink | Iterable[LogSink] | Iterable[str] | str = LogSink.SCREEN, anonymous: bool = False, hidden: bool = False, on_exit: Callable = None, max_respawns: int = 0, respawn_delay: float = 0.0, use_shell: bool = False, autostart_process: bool = True, ros_waittime: float = 3.0, lifecycle_waittime: float = 0.01, lifecycle_target: LifecycleStage | str = LifecycleStage.ACTIVE, raw: bool = False, remap_qualifier: str = None, qualify_all_remaps: bool = False) Node
Create a new ROS2 node process. The bread and butter of every ROS setup!
Please note that by default better_launch will generate an anonymous node name if no node name was specified. This helps to avoid multiple nodes with the same name, which in ROS2 is both possible and problematic. If you really don’t want to specify a node name, pass an empty string instead.
This method also handles lifecycle nodes (they REALLY should have a common interface). Note that especially for lifecycle nodes you probably want autostart_process == True, otherwise there lifecycle management will not exist. With autostart_process == True, a lifecycle node will automatically advance to lifecycle_target once it is up. Otherwise you can also call [Node.start][better_launch.elements.abstract_node.AbstractNode.start] later.
The ROS2 documentation can provide some additional information regarding params, remaps, and so on.
Parameters
- packagestr
The package providing the node.
- executablestr
The executable that should be run.
- namestr, optional
The name you want the node to be known as. If None, a name will be derived from package and executable and anonymous will be set to True. Pass an empty string instead if you really want to use the node’s default name - just know that you’ll make a cute kitten really sad.
- remapsdict[str, str], optional
Tells the node to replace any topics it wants to interact with according to the provided dict.
- paramsstr | dict[str, Any], optional
Any ROS parameters you want to pass to the node. These are the args you would typically have to declare in your launch file. A string will be interpreted as a path to a yaml file which will be lazy loaded using [BetterLaunch.load_params][].
- cmd_argslist[str], optional
Additional command line arguments to pass to the node.
- envdict[str, str], optional
Additional environment variables to set for the node’s process. The node process will merge these with the environment variables of the better_launch host process unless isolate_env is True.
- isolate_envbool, optional
If True, the node process’ env will not be inherited from the parent process and only those passed via env will be used. Be aware that this can result in many common things to not work anymore since e.g. keys like PATH will be missing.
- log_levelint, optional
The minimum severity a logged message from this node must have in order to be published. This will be added to the cmd_args unless it is None.
- outputLogSink | Iterable[LogSink] | Iterable[str] | str, optional
Determines if and where this node’s output should be directed. Common choices are screen to print to terminal, log to write to a common log file, own_log to write to a node-specific log file, and none to not write any output anywhere. See [configure_logger][utils.better_logging.configure_logger] for details.
- anonymousbool, optional
If True, the node name will be appended with a unique suffix to avoid name conflicts.
- hiddenbool, optional
If True, the node name will be prepended with a “_”, hiding it from common listings.
- on_exitCallable, optional
A function to call when the node’s process terminates (after any possible respawns).
- max_respawnsint, optional
How often to restart the node process if it terminates.
- respawn_delayfloat, optional
How long to wait before restarting the node process after it terminates.
- use_shellbool, optional
If True, invoke the node executable via the system shell. While this gives access to the shell’s builtins, this has the downside of running the node inside a “mystery program” which is platform and user dependent. Generally not advised.
- autostart_processbool, optional
If True, start the node process before returning from this function.
- ros_waittimefloat, optional
How long to wait for the node to register with ROS. This should cover the time between the process starting and the node initializing itself. Set negative to wait indefinitely. Set to None to avoid the check entirely. Will do nothing if autostart_process is False.
- lifecycle_waittimefloat, optional
How long to wait for the node’s lifecycle management to come up. This should cover the time between the node initializing itself (see ros_waittime) and creating its additional topics and services. While neglible on modern computers, slower devices and embedded systems may experience a noticable delay here. Set negative to wait indefinitely. Set to None to avoid the check entirely. Will do nothing if autostart_process is False.
- lifecycle_targetLifecycleStage | str, optional
The lifecycle stage to bring the node into after starting. Has no effect if autostart_process is False or if the node does not appear to be a lifecycle node after waiting ros_waittime + lifecycle_waittime.
- rawbool, optional
If True, don’t treat the executable as a ROS2 node and avoid passing it any command line arguments except those specified.
- remap_qualifierstr, optional
Additional qualifier that will precede the node’s __ns and __name remap rules. Should be the original name of the node (i.e. whatever its default name is) and can be qualified with a namespace. Useful to prevent multiple nodes with the same name when a process can have more than one node (e.g. controller_manager). See [this ROS2 design doc](https://design.ros2.org/articles/static_remapping.html#how-the-syntax-works) for more information.
- qualify_all_remapsbool, optional
If True, apply the remap_qualifier to all remaps that are not already qualified.
Returns
- Node
The node object wrapping the node process.
Raises
- RuntimeError
If you try to add a node withing a [compose][] context.
- publish_message(topic: str, message_type: str | type, message_args: dict[str, Any], qos_profile: QoSProfile | int = 10, *, time_to_publish: float = 1.0) None
Convenience method to publish a single message. The publisher will be destroyed once the message has been published. If you plan to publish additional messages, use [publisher][] instead and use the instance.
Parameters
- topicstr
The topic to publish messages on.
- message_typestr | type
The message type that will be published. Strings must follow the pattern <package>/msg/<message>.
- message_argsdict[str, Any]
The keyword arguments from which the message will be constructed.
- qos_profileQoSProfile | int, optional
A quality of service profile that changes how the publisher handles connections and retains data.
- time_to_publish: float, optional
How long to give the publisher to submit the message. Expect the message to get lost if you set this to 0.
- publisher(topic: str, message_type: str | type, qos_profile: QoSProfile | int = 10) rclpy.node.Publisher
Create a ROS2 publisher using the [shared_node][].
Parameters
- topicstr
The topic to publish messages on.
- message_typestr | type
The message type that will be published. Strings must follow the pattern <package>/msg/<message>.
- qos_profileQoSProfile | int, optional
A quality of service profile that changes how the publisher handles connections and retains data.
Returns
- RosPublisher
The publisher object. Although not required for Jazzy and below, it is recommended to keep a reference.
- query_node(pattern: str, *, include_components: bool = True, include_launch_service: bool = False, include_foreign: bool = False) AbstractNode
Retrieve the first node matching the provided pattern.
Parameters
- patternstr
Either the name of a node, or a qualified node name (i.e. namespace + name). If a namespace is included it must be absolute, but may include * or ** wildcards to skip one or more groups (via fnmatch).
- include_componentsbool, optional
Whether to include components in the results, if any.
- include_launch_servicebool, optional
Whether to include the ROS2 launch service in the result (if it exists).
- include_foreignbool, optional
Whether to include foreign nodes not created by this launcher.
Returns
- AbstractNode
The first node matching the provided pattern, or None if none matched.
- query_nodes(pattern: str, *, include_components: bool = True, include_launch_service: bool = False, include_foreign: bool = False) Generator[AbstractNode, None, None]
Yield all nodes matching the provided pattern.
Parameters
- patternstr
Either the name of a node, or a qualified node name (i.e. namespace + name). If a namespace is included it must be absolute, but may include * or ** wildcards to skip one or more groups (via fnmatch).
- include_componentsbool, optional
Whether to include components in the results, if any.
- include_launch_servicebool, optional
Whether to include the ROS2 launch service in the result (if it exists).
- include_foreignbool, optional
Whether to include foreign nodes not created by this launcher.
Returns
- Generator[AbstractNode, None, None]
The nodes matching the pattern.
- ros2_actions(*ros2_actions) Ros2LaunchWrapper
Submit additional ROS2 launch actions for execution.
If no launch.LaunchService exists yet it will be created and started immediately.
- ros2_launch_service(name: str = 'LaunchService', launchservice_args: list[str] = None, output: LogSink | Iterable[LogSink] | Iterable[str] | str = LogSink.SCREEN, start_immediately: bool = True) Ros2LaunchWrapper
Create or retrieve a manager object that can be used for queueing ROS2 launch actions.
Usually, calling [ros2_actions][] is more convenient for queueing actions. However, calling this first allows to prevent starting the underlying launch.LaunchService immediately, giving more control over when the actions are executed.
Since the LaunchService insists on running on the main thread it will be started as a sub process.
Note that only one instance of the ROS2 wrapper should ever exist. Calling this method after it has been created will return the already existing instance instead. Any passed arguments will be silently discarded.
Parameters
- namestr, optional
The name used to identify the process and its logger.
- launchservice_argslist[str], optional
Additional launch arguments to pass to the ROS2 launch service. These will end up in launch.LaunchContext.argv.
- outputLogSink | Iterable[LogSink] | Iterable[str] | str, optional
How log output from the launch service should be handled. This will also include the output from all nodes launched by this launch service. Common choices are screen to print to terminal, log to write to a common log file, own_log to write to a node-specific log file, and none to not write any output anywhere. See [configure_logger][utils.better_logging.configure_logger] for details.
- start_immediatelybool, optional
If True, the ROS2 launch service process is started immediately.
Returns
- Ros2LaunchWrapper
The wrapper hosting the ROS2 launch service process.
- property ros_adapter: ROSAdapter
Contains and runs a shared ROS2 node to interact with topics, services, etc. The adapter is instantiated lazily as it brings a major performance hit (about 6 MiB memory and a high CPU spike).
- static ros_distro() str
Returns the name of the currently sourced ros distro (i.e. $ROS_DISTRO).
- run_later(delay: float, callback: Callable, *args, **kwargs) Future
Convenience method for running a callback with a delay. The callback will be called on a separte thread.
This mainly exists to cover the use case where you want to interact with ROS from an rclpy.Timer. A synchronous call from within a timer (e.g. a service call like [Node.set_live_params][]) will block ROS’ background event loop, preventing publishers, subscribers, services, etc. from doing their work. It will also prevent a clean shutdown as ROS usually waits for the event queue to become empty.
When executing a long running task this way it is a good idea to check [is_shutdown][] in between iterations.
Parameters
Returns
- Future
A future which will be cancelled on shutdown, otherwise it will hold the callback’s result or an exception the callback raised.
- service(topic: str, service_type: str | type, callback: Callable[[Any], Any], qos_profile: rclpy.qos.QoSProfile = None) rclpy.node.Service
Create a ROS2 service provider using the [shared_node][].
Parameters
- topicstr
The topic the service will live on.
- service_typestr | type
The service’s message type. Strings must follow the pattern <package>/srv/<message>.
- callbackCallable[[Any], Any]
The function that will handle any requests to the service. The type of the request will be of type service_type.Request.
- qos_profileQoSProfile, optional
A quality of service profile that changes how the service handles connections.
Returns
- RosServiceProvider
The service object. Although not required for Jazzy and below, it is recommended to keep a reference.
- service_client(topic: str, service_type: str | type, timeout: float = 5.0, qos_profile: rclpy.qos.QoSProfile = None) rclpy.node.Client
Create a ROS2 service client that can be used to call a service.
Parameters
- topicstr
The service topic to post requests on.
- service_typestr | type
The service’s message type. Strings must follow the pattern <package>/srv/<message>.
- timeoutfloat, optional
Time to wait for the service to become available. Ignored if <= 0.
- qos_profileQoSProfile, optional
A quality of service profile that changes how the service handles connections.
Returns
- RosServiceClient
The client object. Although not required for Jazzy and below, it is recommended to keep a reference.
Raises
- TimeoutError
If the service did not become available within the specified timeout.
A ROS2 node instance that can be used for creating publishers, services, etc.
- shutdown(reason: str = None, signum: int = Signals.SIGTERM) None
Ask all nodes to shutdown and terminate the internal ROS2 thread. Any subsequent calls to BetterLaunch member functions, including this one, may fail. This will typically be called when you want to terminate your launch file.
Parameters
- reasonstr, optional
A human-readable string explaining the reason for the shutdown. If not given this will be requitted with a warning.
- signumint, optional
The signal to send to child processes.
- sleep(seconds: float) None
Wait for the specified amount of time.
This mostly exists to provide this functionality for TOML launchfiles.
Parameters
- secondsfloat
How many seconds to wait.
- spin(exit_with_last_node: bool = True) None
Join the BetterLaunch thread until it terminates. You do not need to call this if you’re using the [launch_this][] wrapper or the TUI.
Parameters
- exit_with_last_nodebool, optional
If True this function will return when all nodes have been stopped.
- subscriber(topic: str, message_type: type, callback: Callable[[Any], None], qos_profile: QoSProfile | int = 10) rclpy.node.Subscription
Create a ROS2 subscriber to receive messages.
Parameters
- topicstr
The topic to listen on for messages.
- message_typestr | type
The type of the messages that will be received. Strings must follow the pattern <package>/msg/<message>.
- callbackCallable[[Any], Any]
A function that will be called whenever a message is received.
- qos_profileQoSProfile | int, optional
A quality of service profile that changes how the publisher handles connections and retains data.
Returns
- RosSubscriber
The subscriber object. Although not required for Jazzy and below, it is recommended to keep a reference.
- wait_for_service(service: str, timeout: float = None) bool
Wait for the specified service to appear.
Parameters
- servicestr
The full path of the service to wait for.
- timeoutfloat, optional
How long to wait for the service. Wait forever if None.
Returns
- bool
True if the service appeared within the timeout, False otherwise.
- wait_for_topic(topic: str, timeout: float = None) bool
Wait for the specified topic to appear.
Parameters
- topicstr
The full path of the topic to wait for.
- timeoutfloat, optional
How long to wait for the topic. Wait forever if None.
Returns
- bool
True if the topic appeared within the timeout, False otherwise.
- class better_launch.launcher.BetterLaunchMeta
Bases:
type- instance() BetterLaunch
Immediately retrieve the BetterLaunch singleton instance.
Returns
- BetterLaunch
The BetterLaunch singleton instance, or None if it doesn’t exist yet.
- wait_for_instance(timeout: float = None) BetterLaunch
Retrieve the BetterLaunch singleton instance as soon as possible.
Parameters
- timeoutfloat, optional
How long to wait for the singleton instance to appear. Wait forever if timeout is None. Don’t wait at all if timeout is 0.0.
Returns
- BetterLaunch
The BetterLaunch singleton instance.
Raises
- TimeoutError
If the timeout has passed and no instance has appeared yet.