better_launch.elements.foreign_node module

class better_launch.elements.foreign_node.ForeignNode(process: psutil.Process, package: str, name: str, namespace: str, *, remaps: dict[str, str] | None = None, params: str | dict[str, Any] | None = None, cmd_args: list[str] | None = None, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen')

Bases: AbstractNode, LiveParamsMixin

property cmd_args: list[str]

Additional arguments passed to the node process.

property is_running: bool

True if the node is currently running.

join(timeout: float | None = None) int

Wait for the underlying process to terminate and return its exit code. Returns immediately if the process is not running.

Parameters

timeoutfloat, optional

How long to wait for the process to finish. Wait forever if None.

Returns

int

The exit code of the process, or None if it is already terminated.

Raises

TimeoutError

If a timeout was specified and the process is still running by the time the timeout expires.

property pid: int

The process ID of the node process. Will be -1 if the process is not running.

shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = 0.0) None

Shutdown this node. Once this succeeds, is_running() will return False.

Parameters

reasonstr

A human-readable string describing why this node is being shutdown.

signumint, optional

The signal that should be send to the node (if supported).

timeoutfloat, optional

How long to wait for the node to shutdown before returning. Don’t wait if timeout is 0.0. Wait forever if timeout is None.

Raises

TimeoutError

If a timeout > 0 was set and the node did not shutdown before then.

start() None

Start this node. Once this succeeds, is_running() will return True.

takeover(kill_after: float = 0, **node_args) Node

Replaces a foreign node with a node belonging to this better_launch process. This allows to e.g. capture the node’s output and control a few additional runtime parameters. Any interactions with this foreign node instance after this function returns are undefined behavior.

NOTE: Currently the only way to takeover a node is to stop the original process, then recreate and restart the node with the same arguments as the original node.

Parameters

kill_after: float, optional

Kill the node process if it takes longer than this many seconds to shutdown. Disabled when <= 0.

node_argsdict[str, Any], optional

Additional arguments to pass to the node. See Node for additional details.

Returns

Node

The new node instance that should replace this foreign node.

classmethod wrap_process(process: psutil.Process) ForeignNode

Collect information like namespace, package, executable and so on from a process and wrap it in a ForeignNode instance.

Note that there is no way to verify whether the process actually belongs to a ROS node. Consider using e.g. find_ros2_node_processes() to that effect.

Parameters

processpsutil.Process

The process to wrap.

Returns

ForeignNode

A node object carrying the extracted node information.

better_launch.elements.foreign_node.find_foreign_nodes() list[AbstractNode]

Searches the running processes for ROS2 nodes that have not been started by this better_launch process and wraps them in ForeignNode instances.

Returns

list[AbstractNode]

A list of discovered foreign ROS2 nodes.

better_launch.elements.foreign_node.find_process_for_node(namespace: str, name: str) list[psutil.Process]

Find processes that look like ROS2 nodes which have been passed the specified namespace and name.

Parameters

namespacestr

The namespace to look for.

namestr

The node name to look for.

Returns

list[psutil.Process]

A list processes that match the above criteria.

better_launch.elements.foreign_node.find_ros2_node_processes() list[psutil.Process]

Finds processes that seem to be ROS2 nodes.

Unfortunately, ROS2 doesn’t provide any means of discovering node internals other than by looking at the process command line. Lucky for us, there are a couple of distinct command line arguments that are somewhat unique to ROS. These are: - –ros-args for passing arguments - __ns:=<namespace> - __node:=<name> - __name:=<name>

If any of these are present, the process will be added to the returned list.

Returns

list[psutil.Process]

The processes that appear to be ROS2 nodes.

better_launch.elements.foreign_node.get_package_for_path(path: str) tuple[str, str]

Find the ROS2 package associated with the specified path.

This will first check the currently registered packages (which are stored in $AMENT_PREFIX_PATH). If none of these match it will search through the path starting from the end for a valid package.xml file to get the package name.

Parameters

pathstr

An absolute path to find the package for.

Returns

tuple[str, str]

The package name and path to the package, or (None, None) if the package could not be determined.

better_launch.elements.foreign_node.parse_process_args(process: psutil.Process, node: AbstractNode | None = None) tuple[str, str, dict[str, str], dict[str, str], list[str]]

Parse ROS2 command line arguments and return the user-specified parts.

In particular, this will return the passed ROS2 params, remaps and additional command line arguments. Special ROS2 arguments like –ros-args, –remap, etc. will not be included. A node may be passed in order to resolve nodename:key:=value style args and load parameter files from –params-file.

Parameters

cmd_argslist[str]

The command line arguments to parse, including the call to the executable at [0].

nodeAbstractNode, optional

A node to use when resolving additional details.

Returns

tuple[dict[str, str], dict[str, str], list[str]]

The node’s namespace and name, followed by its ROS2 params, remaps and additional command line args.