better_launch.elements.foreign_node moduleο
- class better_launch.elements.foreign_node.ForeignNode(process: psutil.Process, package: str, name: str, namespace: str, *, remaps: dict[str, str] | None = None, params: str | dict[str, Any] | None = None, cmd_args: list[str] | None = None, output: Literal['screen', 'log', 'own_log', 'none'] | set[Literal['screen', 'log', 'own_log', 'none']] = 'screen')ο
Bases:
AbstractNode
,LiveParamsMixin
- property cmd_args: list[str]ο
Additional arguments passed to the node process.
- property is_running: boolο
True if the node is currently running.
- join(timeout: float | None = None) int ο
Wait for the underlying process to terminate and return its exit code. Returns immediately if the process is not running.
Parametersο
- timeoutfloat, optional
How long to wait for the process to finish. Wait forever if None.
Returnsο
- int
The exit code of the process, or None if it is already terminated.
Raisesο
- TimeoutError
If a timeout was specified and the process is still running by the time the timeout expires.
- property pid: intο
The process ID of the node process. Will be -1 if the process is not running.
- shutdown(reason: str, signum: int = Signals.SIGTERM, timeout: float = 0.0) None ο
Shutdown this node. Once this succeeds,
is_running()
will return False.Parametersο
- reasonstr
A human-readable string describing why this node is being shutdown.
- signumint, optional
The signal that should be send to the node (if supported).
- timeoutfloat, optional
How long to wait for the node to shutdown before returning. Donβt wait if timeout is 0.0. Wait forever if timeout is None.
Raisesο
- TimeoutError
If a timeout > 0 was set and the node did not shutdown before then.
- start() None ο
Start this node. Once this succeeds,
is_running()
will return True.
- takeover(kill_after: float = 0, **node_args) Node ο
Replaces a foreign node with a node belonging to this better_launch process. This allows to e.g. capture the nodeβs output and control a few additional runtime parameters. Any interactions with this foreign node instance after this function returns are undefined behavior.
NOTE: Currently the only way to takeover a node is to stop the original process, then recreate and restart the node with the same arguments as the original node.
Parametersο
- kill_after: float, optional
Kill the node process if it takes longer than this many seconds to shutdown. Disabled when <= 0.
- node_argsdict[str, Any], optional
Additional arguments to pass to the node. See
Node
for additional details.
Returnsο
- Node
The new node instance that should replace this foreign node.
- classmethod wrap_process(process: psutil.Process) ForeignNode ο
Collect information like namespace, package, executable and so on from a process and wrap it in a ForeignNode instance.
Note that there is no way to verify whether the process actually belongs to a ROS node. Consider using e.g.
find_ros2_node_processes()
to that effect.Parametersο
- processpsutil.Process
The process to wrap.
Returnsο
- ForeignNode
A node object carrying the extracted node information.
- better_launch.elements.foreign_node.find_foreign_nodes() list[AbstractNode] ο
Searches the running processes for ROS2 nodes that have not been started by this better_launch process and wraps them in ForeignNode instances.
Returnsο
- list[AbstractNode]
A list of discovered foreign ROS2 nodes.
- better_launch.elements.foreign_node.find_process_for_node(namespace: str, name: str) list[psutil.Process] ο
Find processes that look like ROS2 nodes which have been passed the specified namespace and name.
Parametersο
- namespacestr
The namespace to look for.
- namestr
The node name to look for.
Returnsο
- list[psutil.Process]
A list processes that match the above criteria.
- better_launch.elements.foreign_node.find_ros2_node_processes() list[psutil.Process] ο
Finds processes that seem to be ROS2 nodes.
Unfortunately, ROS2 doesnβt provide any means of discovering node internals other than by looking at the process command line. Lucky for us, there are a couple of distinct command line arguments that are somewhat unique to ROS. These are: - βros-args for passing arguments - __ns:=<namespace> - __node:=<name> - __name:=<name>
If any of these are present, the process will be added to the returned list.
Returnsο
- list[psutil.Process]
The processes that appear to be ROS2 nodes.
- better_launch.elements.foreign_node.get_package_for_path(path: str) tuple[str, str] ο
Find the ROS2 package associated with the specified path.
This will first check the currently registered packages (which are stored in $AMENT_PREFIX_PATH). If none of these match it will search through the path starting from the end for a valid package.xml file to get the package name.
Parametersο
- pathstr
An absolute path to find the package for.
Returnsο
- tuple[str, str]
The package name and path to the package, or (None, None) if the package could not be determined.
- better_launch.elements.foreign_node.parse_process_args(process: psutil.Process, node: AbstractNode | None = None) tuple[str, str, dict[str, str], dict[str, str], list[str]] ο
Parse ROS2 command line arguments and return the user-specified parts.
In particular, this will return the passed ROS2 params, remaps and additional command line arguments. Special ROS2 arguments like βros-args, βremap, etc. will not be included. A node may be passed in order to resolve nodename:key:=value style args and load parameter files from βparams-file.
Parametersο
- cmd_argslist[str]
The command line arguments to parse, including the call to the executable at [0].
- nodeAbstractNode, optional
A node to use when resolving additional details.
Returnsο
- tuple[dict[str, str], dict[str, str], list[str]]
The nodeβs namespace and name, followed by its ROS2 params, remaps and additional command line args.