synchros2.scope module
- class synchros2.scope.ROSAwareScope[source]
Bases:
ContextManager[ROSAwareScope]A context manager to enable ROS 2 code in a given scope.
This is accomplished by providing an rclpy node and an autoscaling multi-threaded executor spinning in a background thread.
- class LocalStack[source]
Bases:
_local- top: ROSAwareScope | None = None
- __init__(*, global_: bool = False, uses_tf: bool = False, forward_logging: bool = False, prebaked: bool = True, autospin: bool | None = None, name: str | None = None, namespace: Literal[True] | str | None = None, context: rclpy.context.Context | None = None) None[source]
Initializes the ROS 2 aware scope.
- Parameters:
prebaked – whether to include an implicit main node in the scope graph or not,
convenience. (for)
global_ – whether to make this scope global (ie. accessible from all threads).
one (Only)
Global (outermost global scope can be entered at any given time.)
thread. (scopes can only be entered from the main)
name – optional name for the implicit main node, if any. Defaults to “node”
prebaked. (if none is provided and the scope is)
namespace – optional namespace for all underlying nodes. Defaults to
namespace. (a unique hidden)
autospin – whether to automatically instantiate and push an executor
not. (to a background thread on scope entry or)
uses_tf – whether to instantiate a tf listener bound to the scope main node.
False. (system or not. Defaults to)
forward_logging – whether to forward logging logs to the ROS 2 logging
False.
context – optional context for the underlying executor and nodes.
- property context: rclpy.context.Context
Gets scope context.
- global_: ROSAwareScope | None = None
- load(factory: Callable[[...], NodeT], *args: Any, **kwargs: Any) NodeT[source]
- load(factory: Callable[[...], List[NodeT]], *args: Any, **kwargs: Any) List[NodeT]
Overloaded method. See above for documentation.
- local = <synchros2.scope.ROSAwareScope.LocalStack object>
- managed(factory: NodeFactoryCallable[NodeT], *args: Any, **kwargs: Any) ContextManager[NodeT][source]
- managed(factory: GraphFactoryCallable[NodeT], *args: Any, **kwargs: Any) ContextManager[List[NodeT]]
Overloaded method. See above for documentation.
- spin() None[source]
- spin(factory: Callable[[...], NodeT], *args: Any, **kwargs: Any) None
- spin(factory: Callable[[...], List[NodeT]], *args: Any, **kwargs: Any) None
Overloaded method. See above for documentation.
- property tf_listener: TFListenerWrapper | None
Gets scope tf listener, if any.
- unload(loaded: rclpy.node.Node | List[rclpy.node.Node]) None[source]
Unloads and destroys ROS 2 nodes.
- Parameters:
loaded – ROS 2 node or a collection thereof to unload.
- Raises:
RuntimeError – if scope has not been entered (or already exited) or
an scope executor has not been set yet. –
ValueError – if any given ROS 2 node is not found in the scope graph.
- synchros2.scope.clock() rclpy.clock.Clock | None[source]
Gets the clock of the current ROS 2 aware scope node, if any.
- synchros2.scope.current() ROSAwareScope | None[source]
Gets the current ROS 2 aware scope, if any.
- synchros2.scope.ensure_node() rclpy.node.Node[source]
Gets a node from the current ROS 2 aware scope or fails trying
- synchros2.scope.executor() rclpy.executors.Executor | None[source]
Gets the executor of the current ROS 2 aware scope, if any.
- synchros2.scope.load(factory: Callable[[...], NodeT], *args: Any, **kwargs: Any) NodeT[source]
- synchros2.scope.load(factory: Callable[[...], List[NodeT]], *args: Any, **kwargs: Any) List[NodeT]
Loads a ROS 2 node (or a collection thereof) within the current ROS 2 aware scope.
See ROSAwareScope.load documentation for further reference on positional and keyword arguments taken by this function.
- Raises:
RuntimeError – if called outside scope.
- synchros2.scope.managed(factory: Callable[[...], NodeT], *args: Any, **kwargs: Any) ContextManager[NodeT][source]
- synchros2.scope.managed(factory: Callable[[...], List[NodeT]], *args: Any, **kwargs: Any) ContextManager[List[NodeT]]
Manages a ROS 2 node (or a collection thereof) within the current ROS 2 aware scope.
See ROSAwareScope.managed documentation for further reference on positional and keyword arguments taken by this function.
- Raises:
RuntimeError – if called outside scope.
- synchros2.scope.node() rclpy.node.Node | None[source]
Gets the node of the current ROS 2 aware scope, if any.
- synchros2.scope.spin(factory: Callable[[...], NodeT] | Callable[[...], List[NodeT]] | None = None, *args: Any, **kwargs: Any) None[source]
Spins current ROS 2 aware scope executor (and all the ROS 2 nodes in it).
Optionally, manages a ROS 2 node (or a collection thereof) for as long as it spins.
See ROSAwareScope.spin documentation for further reference on positional and keyword arguments taken by this function.
- Raises:
RuntimeError – if called outside scope.
- synchros2.scope.tf_listener() TFListenerWrapper | None[source]
Gets the tf listener of the current ROS 2 aware scope, if any.
- synchros2.scope.top(args: Sequence[str] | None = None, *, context: rclpy.context.Context | None = None, global_: bool = False, interruptible: bool = False, disable_stdout_logs: bool = False, domain_id: int | None = None, **kwargs: Any) Iterator[ROSAwareScope][source]
Manages a ROS 2 aware scope, handling ROS 2 context lifecycle as well.
- Parameters:
args – optional command-line arguments for context initialization.
context – optional context to manage. If none is provided, one will
scopes (be created. For global)
used. (the default context will be)
global_ – whether to use the global context or a locally constructed one.
interruptible – global interruptible scopes will skip installing ROS 2
SIGTERM (signal handlers and let the user deal with SIGINT and)
instead. (interruptions)
disable_stdout_logs – whether to disable ROS 2 console logging.
domain_id – a domain id used for initializing rclpy.
kwargs – keyword arguments to pass to ROSAwareScope.
See ROSAwareScope documentation for further reference on positional and keyword arguments taken by this function.
- Returns:
a context manager.
- synchros2.scope.unload(loaded: rclpy.node.Node | List[rclpy.node.Node]) None[source]
Unloads a ROS 2 node (or a collection thereof) from the current ROS 2 aware scope.
See ROSAwareScope.unload documentation for further reference on positional and keyword arguments taken by this function.
- Raises:
RuntimeError – if called outside scope.