synchros2.process module

class synchros2.process.ROSAwareProcess[source]

Bases: object

A main-like function wrapper that binds a ROS 2 aware scope to each function invocation.

Only one process can be invoked at a time, becoming the current process. This enables global access to the process and, in consequence, to its scope.

This wrapper is rarely be used explicitly but through the main decorator.

__init__(func: Callable[[], int | None] | Callable[[Sequence[str]], int | None] | Callable[[Namespace], int | None], *, uses_tf: bool = False, interruptible: bool = False, prebaked: bool = True, autospin: bool | None = None, forward_logging: bool | None = None, name: str | None = None, namespace: Literal[True] | str | None = None, cli: ArgumentParser | None = None, **init_arguments: Any)[source]

Initializes the ROS 2 aware process.

Parameters:
  • func – a main-like function to wrap ie. a callable taking a sequence of strings, an argparse.Namespace (if a CLI is specified), or nothing, and returning a integer exit code or nothing.

  • prebaked – whether to instantiate a prebaked or a bare process. A prebaked process bears an implicit node and spins an executor in the background. A bare process does neither, which brings them the closest to standard ROS 2 idioms.

  • autospin – whether to automatically equip the underlying scope with a background executor or not. Defaults to True for prebaked processes and to False for bare processes.

  • uses_tf – whether to instantiate a tf listener bound to the process main node. Defaults to False.

  • interruptible – whether the process allows graceful interruptions i.e. SIGINT (Ctrl+C) or SIGTERM signaling. An interruptible process will not shutdown any context or trigger any guard condition on either but simply raise KeyboardInterrupt and SystemExit exceptions instead.

  • forward_logging – whether to forward logging logs to the ROS 2 logging system or not. Defaults to True for prebaked processes and to False for bare processes (though it requires a process node to be set to function).

  • name – optional name for the implicit main node, if any. Defaults to the current executable basename without its extension or the CLI program name if one is specified, unless it is already used as a namespace, in which case name defaults to “node”.

  • namespace – an optional namespace for this process. If True, the current executable basename without its extension (or CLI program name if one is specified) will be used.

  • cli – optional command-line interface argument parser.

  • init_arguments – Keyword arguments for the scope.

Raises:

ValueError – if a prebaked process is configured without autospin.

property cli: ArgumentParser | None

Get the associated CLI argument parser, if any.

current: ROSAwareProcess | None = None
lock = <unlocked _thread.lock object>
try_shutdown() None[source]

Atempts to shutdown the underlying scope context.

wait_for_interrupt(*, timeout_sec: float | None = None) None[source]

Wait for process interruption i.e. a KeyboardInterrupt or a SystemExit.

This can only be done from the main thread. Also, note that timeouts are implemented using POSIX timers and alarms.

Parameters:

timeout_sec – optional timeout for wait, wait indefinitely by default.

wait_for_shutdown(*, timeout_sec: float | None = None) bool[source]

Wait for shutdown of the underlying scope context.

Parameters:

timeout_sec – optional timeout for wait, wait indefinitely by default.

Returns:

True if shutdown, False on timeout.

synchros2.process.current() ROSAwareProcess | None[source]

Gets the current ROS 2 aware process, if any.

synchros2.process.ensure_node() rclpy.node.Node[source]

Gets a node from the current ROS 2 aware process or fails trying

synchros2.process.executor() rclpy.executors.Executor | None[source]

Gets the executor of the current ROS 2 aware process, if any.

synchros2.process.load(factory: Callable[[...], NodeT], *args: Any, **kwargs: Any) NodeT[source]
synchros2.process.load(factory: Callable[[...], List[NodeT]], *args: Any, **kwargs: Any) List[NodeT]

Loads a ROS 2 node (or a collection thereof) within the current ROS 2 aware process scope.

See ROSAwareProcess and ROSAwareScope.load documentation for further reference on positional and keyword arguments taken by this function.

Raises:

RuntimeError – if no process is executing.

synchros2.process.main(cli: ArgumentParser | None = None, **kwargs: Any) Callable[[Callable[[], int | None] | Callable[[Sequence[str]], int | None] | Callable[[Namespace], int | None]], ROSAwareProcess][source]

Wraps a main-like function in a ROSAwareProcess instance.

synchros2.process.managed(factory: Callable[[...], NodeT], *args: Any, **kwargs: Any) ContextManager[NodeT][source]
synchros2.process.managed(factory: Callable[[...], List[NodeT]], *args: Any, **kwargs: Any) ContextManager[List[NodeT]]

Manages a ROS 2 node (or a collection thereof) within the current ROS 2 aware process scope.

See ROSAwareProcess and ROSAwareScope.managed documentation for further reference on positional and keyword arguments taken by this function.

Raises:

RuntimeError – if no process is executing.

synchros2.process.node() rclpy.node.Node | None[source]

Gets the node of the current ROS 2 aware process, if any.

synchros2.process.spin(factory: Callable[[...], NodeT] | Callable[[...], List[NodeT]] | None = None, *args: Any, **kwargs: Any) None[source]

Spins current ROS 2 aware process executor (and all ROS 2 nodes in it).

Optionally, manages a ROS 2 node (or a collection thereof) for as long as it spins.

See ROSAwareProcess and ROSAwareScope.spin documentation for further reference on positional and keyword arguments taken by this function.

Raises:

RuntimeError – if no process is executing.

synchros2.process.tf_listener() TFListenerWrapper | None[source]

Gets the tf listener of the current ROS 2 aware scope, if any.

synchros2.process.try_shutdown() None[source]

Attempts to shutdown the context associated with the current ROS 2 aware process.

synchros2.process.unload(loaded: rclpy.node.Node | List[rclpy.node.Node]) None[source]

Unloads a ROS 2 node (or a collection thereof) from the current ROS 2 aware process scope.

See ROSAwareProcess and ROSAwareScope.unload documentation for further reference on positional and keyword arguments taken by this function.

Raises:

RuntimeError – if no process is executing.

synchros2.process.wait_for_interrupt(*, timeout_sec: float | None = None) None[source]

Wait for current ROS 2 aware process interruption.

See ROSAwareProcess.wait_for_interrupt documentation for further reference on positional and keyword arguments taken by this function.

Raises:

RuntimeError – if no process is executing.

synchros2.process.wait_for_shutdown(*, timeout_sec: float | None = None) bool[source]

Wait for current ROS 2 aware process to shutdown.

See ROSAwareProcess.wait_for_shutdown documentation for further reference on positional and keyword arguments taken by this function.

Raises:

RuntimeError – if no process is executing.