synchros2.scope module

class synchros2.scope.ROSAwareScope[source]

Bases: ContextManager[ROSAwareScope]

A context manager to enable ROS 2 code in a given scope.

This is accomplished by providing an rclpy node and an autoscaling multi-threaded executor spinning in a background thread.

class LocalStack[source]

Bases: _local

top: ROSAwareScope | None = None
__init__(*, global_: bool = False, uses_tf: bool = False, forward_logging: bool = False, prebaked: bool = True, autospin: bool | None = None, name: str | None = None, namespace: Literal[True] | str | None = None, context: rclpy.context.Context | None = None) None[source]

Initializes the ROS 2 aware scope.

Parameters:
  • prebaked – whether to include an implicit main node in the scope graph or not,

  • convenience. (for)

  • global_ – whether to make this scope global (ie. accessible from all threads).

  • one (Only)

  • Global (outermost global scope can be entered at any given time.)

  • thread. (scopes can only be entered from the main)

  • name – optional name for the implicit main node, if any. Defaults to “node”

  • prebaked. (if none is provided and the scope is)

  • namespace – optional namespace for all underlying nodes. Defaults to

  • namespace. (a unique hidden)

  • autospin – whether to automatically instantiate and push an executor

  • not. (to a background thread on scope entry or)

  • uses_tf – whether to instantiate a tf listener bound to the scope main node.

  • False. (system or not. Defaults to)

  • forward_logging – whether to forward logging logs to the ROS 2 logging

  • False.

  • context – optional context for the underlying executor and nodes.

property context: rclpy.context.Context

Gets scope context.

property executor: rclpy.executors.Executor | None

Gets scope executor, if any.

global_: ROSAwareScope | None = None
property graph: List[rclpy.node.Node]

Gets scope nodes, if any.

load(factory: Callable[[...], NodeT], *args: Any, **kwargs: Any) NodeT[source]
load(factory: Callable[[...], List[NodeT]], *args: Any, **kwargs: Any) List[NodeT]

Overloaded method. See above for documentation.

local = <synchros2.scope.ROSAwareScope.LocalStack object>
managed(factory: NodeFactoryCallable[NodeT], *args: Any, **kwargs: Any) ContextManager[NodeT][source]
managed(factory: GraphFactoryCallable[NodeT], *args: Any, **kwargs: Any) ContextManager[List[NodeT]]

Overloaded method. See above for documentation.

property node: rclpy.node.Node | None

Gets scope main node, if any.

spin() None[source]
spin(factory: Callable[[...], NodeT], *args: Any, **kwargs: Any) None
spin(factory: Callable[[...], List[NodeT]], *args: Any, **kwargs: Any) None

Overloaded method. See above for documentation.

property tf_listener: TFListenerWrapper | None

Gets scope tf listener, if any.

unload(loaded: rclpy.node.Node | List[rclpy.node.Node]) None[source]

Unloads and destroys ROS 2 nodes.

Parameters:

loaded – ROS 2 node or a collection thereof to unload.

Raises:
  • RuntimeError – if scope has not been entered (or already exited) or

  • an scope executor has not been set yet.

  • ValueError – if any given ROS 2 node is not found in the scope graph.

synchros2.scope.clock() rclpy.clock.Clock | None[source]

Gets the clock of the current ROS 2 aware scope node, if any.

synchros2.scope.current() ROSAwareScope | None[source]

Gets the current ROS 2 aware scope, if any.

synchros2.scope.ensure_node() rclpy.node.Node[source]

Gets a node from the current ROS 2 aware scope or fails trying

synchros2.scope.executor() rclpy.executors.Executor | None[source]

Gets the executor of the current ROS 2 aware scope, if any.

synchros2.scope.load(factory: Callable[[...], NodeT], *args: Any, **kwargs: Any) NodeT[source]
synchros2.scope.load(factory: Callable[[...], List[NodeT]], *args: Any, **kwargs: Any) List[NodeT]

Loads a ROS 2 node (or a collection thereof) within the current ROS 2 aware scope.

See ROSAwareScope.load documentation for further reference on positional and keyword arguments taken by this function.

Raises:

RuntimeError – if called outside scope.

synchros2.scope.managed(factory: Callable[[...], NodeT], *args: Any, **kwargs: Any) ContextManager[NodeT][source]
synchros2.scope.managed(factory: Callable[[...], List[NodeT]], *args: Any, **kwargs: Any) ContextManager[List[NodeT]]

Manages a ROS 2 node (or a collection thereof) within the current ROS 2 aware scope.

See ROSAwareScope.managed documentation for further reference on positional and keyword arguments taken by this function.

Raises:

RuntimeError – if called outside scope.

synchros2.scope.node() rclpy.node.Node | None[source]

Gets the node of the current ROS 2 aware scope, if any.

synchros2.scope.spin(factory: Callable[[...], NodeT] | Callable[[...], List[NodeT]] | None = None, *args: Any, **kwargs: Any) None[source]

Spins current ROS 2 aware scope executor (and all the ROS 2 nodes in it).

Optionally, manages a ROS 2 node (or a collection thereof) for as long as it spins.

See ROSAwareScope.spin documentation for further reference on positional and keyword arguments taken by this function.

Raises:

RuntimeError – if called outside scope.

synchros2.scope.tf_listener() TFListenerWrapper | None[source]

Gets the tf listener of the current ROS 2 aware scope, if any.

synchros2.scope.top(args: Sequence[str] | None = None, *, context: rclpy.context.Context | None = None, global_: bool = False, interruptible: bool = False, disable_stdout_logs: bool = False, domain_id: int | None = None, **kwargs: Any) Iterator[ROSAwareScope][source]

Manages a ROS 2 aware scope, handling ROS 2 context lifecycle as well.

Parameters:
  • args – optional command-line arguments for context initialization.

  • context – optional context to manage. If none is provided, one will

  • scopes (be created. For global)

  • used. (the default context will be)

  • global_ – whether to use the global context or a locally constructed one.

  • interruptible – global interruptible scopes will skip installing ROS 2

  • SIGTERM (signal handlers and let the user deal with SIGINT and)

  • instead. (interruptions)

  • disable_stdout_logs – whether to disable ROS 2 console logging.

  • domain_id – a domain id used for initializing rclpy.

  • kwargs – keyword arguments to pass to ROSAwareScope.

See ROSAwareScope documentation for further reference on positional and keyword arguments taken by this function.

Returns:

a context manager.

synchros2.scope.unload(loaded: rclpy.node.Node | List[rclpy.node.Node]) None[source]

Unloads a ROS 2 node (or a collection thereof) from the current ROS 2 aware scope.

See ROSAwareScope.unload documentation for further reference on positional and keyword arguments taken by this function.

Raises:

RuntimeError – if called outside scope.