synchros2.executors module

class synchros2.executors.AutoScalingMultiThreadedExecutor[source]

Bases: Executor

An rclpy.executors.Executor subclass based on an AutoScalingThreadPool.

Akin to the rclpy.executors.MultiThreadedExecutor class but with autoscaling capabilities. Moreover, a concurrency quota can be defined on a per callback + callback group basis to avoid thread pool starvation and/or exhaustion of system resources (e.g. when using reentrant callback groups).

To support fine grained control over callback dispatch and execution, more thread pools may be added to the executor. Callback groups can then be bound to specific thread pools. If not, the default thread pool will be used.

See rclpy.executors.Executor documentation for further reference.

class Task[source]

Bases: object

A bundle of an executable task and its associated entity.

__init__(task: rclpy.task.Task, entity: rclpy.executors.WaitableEntityType | None, node: rclpy.node.Node | None) None[source]
cancel() None[source]

Cancels the task

__init__(max_threads: int | None = None, max_thread_idle_time: float | None = None, max_threads_per_callback_group: int | None = None, *, context: rclpy.context.Context | None = None, logger: Logger | None = None) None[source]

Initializes the executor.

Parameters:
  • max_threads – optional maximum number of threads the default thread pool should spin at any given time. See AutoScalingThreadPool documentation for reference on defaults.

  • max_thread_idle_time – optional time in seconds for a thread in the default thread pool should wait for work before shutting itself down. See AutoScalingThreadPool documentation for reference on defaults.

  • max_threads_per_callback_group – optional maximum number of concurrent callbacks the default thread pool should service for a given callback group. Useful to avoid reentrant callback groups from starving the default thread pool.

  • context – An optional instance of the ros context.

  • logger – An optional logger instance.

add_static_thread_pool(num_threads: int | None = None) AutoScalingThreadPool[source]

Add a thread pool that keeps a steady number of workers.

bind(callback_group: rclpy.callback_groups.CallbackGroup, thread_pool: AutoScalingThreadPool) None[source]

Bind a callback group so that it is dispatched to the given thread pool.

Thread pool must be known to the executor. That is, instantiated through add_*_thread_pool() methods.

property default_thread_pool: AutoScalingThreadPool

Default autoscaling thread pool.

shutdown(timeout_sec: float | None = None) bool[source]

Shutdown the executor.

Parameters:

timeout_sec – The timeout for shutting down

spin_once(timeout_sec: float | None = None) None[source]

Complete all immediately available work

spin_once_until_future_complete(future: rclpy.task.Future, timeout_sec: float | None = None) None[source]

Complete all work until the provided future is done.

Parameters:
  • future – The ros future instance

  • timeout_sec – The timeout for working

property thread_pools: List[AutoScalingThreadPool]

Autoscaling thread pools in use.

class synchros2.executors.AutoScalingThreadPool[source]

Bases: Executor

A concurrent.futures.Executor subclass based on a thread pool.

Akin to the concurrent.futures.ThreadPoolExecutor class but with autoscaling capabilities. Within a given range, the number of workers increases with demand and decreases with time. This is achieved by tracking the number of available runslots ie. the number of idle workers waiting for work, and using timeouts to wait for work. Workers add runslots prior to blocking on the runqueue. On submission, a runslot will be taken. If there is no runslot to take, the pool will be scaled up. If a work has been waiting long enough and no work has come along, it will be self terminate, effectively downscaling the pool.

Additionally, individual submissions are tracked and monitored against a configurable quota to avoid any given piece of work from starving the pool. To do this, the implementations takes after CPython’s implementation of the concurrent.futures.ThreadPoolExecutor class, adding runlists and waitqueues per submission “type” (ie. callable hashes) to the main runqueue. Runlists track work either pending execution in the runqueue or executing. Waitqueues track work that is to pushed into the runqueue once the configured quota allows it.

If not shutdown explictly or via context management, the pool will self terminate when either the executor is garbage collected or the interpreter shuts down.

See concurrent.futures.Executor documentation for further reference.

class Work[source]

Bases: object

A work submission to process and execute.

__init__(future: Future, fn: Callable[[...], Any], args: Tuple[Any, ...], kwargs: Dict[str, Any]) None
args: Tuple[Any, ...]
cancel() bool[source]

Cancels work.

cancelled() bool[source]

Checks if work has been cancelled.

execute() None[source]

Executes work and resolves its future.

fn: Callable[[...], Any]
future: Future
kwargs: Dict[str, Any]
notify_cancelation() None[source]

Notifies those waiting on the future about work being cancelled.

pending() bool[source]

Checks if work is pending.

class Worker[source]

Bases: Thread

A worker in its own daemonized OS thread.

__init__(executor_weakref: ReferenceType, stop_on_timeout: bool = True) None[source]

Initializes the worker.

Parameters:
  • executor_weakref – a weak reference to the parent autoscaling thread pool.

  • stop_on_timeout – whether the worker should auto-terminate if it times out

  • work. (waiting for)

run() None[source]

Runs work loop.

__init__(*, min_workers: int | None = None, max_workers: int | None = None, submission_quota: int | None = None, submission_patience: float | None = None, max_idle_time: float | None = None, logger: Logger | None = None)[source]

Initializes the thread pool.

Parameters:
  • min_workers – optional minimum number of workers in the pool, 0 by default.

  • max_workers – optional maximum number of workers in the pool, 32 times the number of available CPU threads by default (assuming I/O bound work).

  • submission_quota – optional maximum number of concurrent submissions for a a given callable. Up to the maximum number of workers by default. Useful when serving multiple users to prevent anyone from starving the rest.

  • submission_patience – optional time to wait in seconds for a worker to become available before upscaling the pool. 100 ms by default.

  • max_idle_time – optional time in seconds for a worker to wait for work before shutting itself down, effectively downscaling the pool. 60 seconds by default.

  • logger – optional user provided logger for the pool.

Raises:
property capped: bool

Whether submission quotas are in force or not.

property scaling_event: Condition

A waitable condition triggered on pool (re)scaling.

shutdown(wait: bool = True, *, cancel_futures: bool = False) None[source]

Shuts down the pool.

Parameters:
  • wait – whether to wait for all worker threads to shutdown.

  • cancel_futures – whether to cancel all ongoing work (and associated futures).

submit(fn: Callable[[...], Any], /, *args: Any, **kwargs: Any) Future[source]

Submits work to the pool.

Parameters:
  • fn – a callable to execute. Must be immutable and hashable

  • quotas. (for the pool to track concurrent submissions and apply)

  • args – optional positional arguments to forward.

  • kwargs – optional keyword arguments to forward.

Returns:

A future for the result of the work submitted.

Raises:

RuntimeError – if the pool has been shutdown.

wait(timeout: float | None = None) bool[source]

Waits for all work in the pool to complete.

Only ongoing work at the time of invocation is watched after. Work added during the wait will not be considered.

Parameters:

timeout – optional timeout, in seconds, for the wait.

Returns:

True if all work completed, False if the wait timed out.

property workers: List[Thread]

Current set of worker threads.

property working: bool

Whether work is ongoing or not.

synchros2.executors.assign_coroutine(coroutine: Callable[[...], Awaitable], executor: rclpy.executors.Executor) Callable[[...], FutureLike][source]

Assign a coroutine to a given executor.

An assigned coroutine will return a future-like object that will be serviced by the associated executor.

synchros2.executors.background(executor: rclpy.executors.Executor) Iterator[rclpy.executors.Executor][source]

Pushes an executor to a background thread.

Upon context entry, the executor starts spinning in a background thread. Upon context exit, the executor is shutdown and the background thread is joined.

Parameters:

executor – executor to be managed.

Returns:

a context manager.

synchros2.executors.foreground(executor: rclpy.executors.Executor) Iterator[rclpy.executors.Executor][source]

Manages an executor in the current thread.

Upon context exit, the executor is shutdown.

Parameters:

executor – executor to be managed.

Returns:

a context manager.