synchros2.executors module
- class synchros2.executors.AutoScalingMultiThreadedExecutor[source]
Bases:
ExecutorAn rclpy.executors.Executor subclass based on an AutoScalingThreadPool.
Akin to the rclpy.executors.MultiThreadedExecutor class but with autoscaling capabilities. Moreover, a concurrency quota can be defined on a per callback + callback group basis to avoid thread pool starvation and/or exhaustion of system resources (e.g. when using reentrant callback groups).
To support fine grained control over callback dispatch and execution, more thread pools may be added to the executor. Callback groups can then be bound to specific thread pools. If not, the default thread pool will be used.
See rclpy.executors.Executor documentation for further reference.
- class Task[source]
Bases:
objectA bundle of an executable task and its associated entity.
- __init__(max_threads: int | None = None, max_thread_idle_time: float | None = None, max_threads_per_callback_group: int | None = None, *, context: rclpy.context.Context | None = None, logger: Logger | None = None) None[source]
Initializes the executor.
- Parameters:
max_threads – optional maximum number of threads the default thread pool should spin at any given time. See AutoScalingThreadPool documentation for reference on defaults.
max_thread_idle_time – optional time in seconds for a thread in the default thread pool should wait for work before shutting itself down. See AutoScalingThreadPool documentation for reference on defaults.
max_threads_per_callback_group – optional maximum number of concurrent callbacks the default thread pool should service for a given callback group. Useful to avoid reentrant callback groups from starving the default thread pool.
context – An optional instance of the ros context.
logger – An optional logger instance.
- add_static_thread_pool(num_threads: int | None = None) AutoScalingThreadPool[source]
Add a thread pool that keeps a steady number of workers.
- bind(callback_group: rclpy.callback_groups.CallbackGroup, thread_pool: AutoScalingThreadPool) None[source]
Bind a callback group so that it is dispatched to the given thread pool.
Thread pool must be known to the executor. That is, instantiated through add_*_thread_pool() methods.
- property default_thread_pool: AutoScalingThreadPool
Default autoscaling thread pool.
- shutdown(timeout_sec: float | None = None) bool[source]
Shutdown the executor.
- Parameters:
timeout_sec – The timeout for shutting down
- spin_once_until_future_complete(future: rclpy.task.Future, timeout_sec: float | None = None) None[source]
Complete all work until the provided future is done.
- Parameters:
future – The ros future instance
timeout_sec – The timeout for working
- property thread_pools: List[AutoScalingThreadPool]
Autoscaling thread pools in use.
- class synchros2.executors.AutoScalingThreadPool[source]
Bases:
ExecutorA concurrent.futures.Executor subclass based on a thread pool.
Akin to the concurrent.futures.ThreadPoolExecutor class but with autoscaling capabilities. Within a given range, the number of workers increases with demand and decreases with time. This is achieved by tracking the number of available runslots ie. the number of idle workers waiting for work, and using timeouts to wait for work. Workers add runslots prior to blocking on the runqueue. On submission, a runslot will be taken. If there is no runslot to take, the pool will be scaled up. If a work has been waiting long enough and no work has come along, it will be self terminate, effectively downscaling the pool.
Additionally, individual submissions are tracked and monitored against a configurable quota to avoid any given piece of work from starving the pool. To do this, the implementations takes after CPython’s implementation of the concurrent.futures.ThreadPoolExecutor class, adding runlists and waitqueues per submission “type” (ie. callable hashes) to the main runqueue. Runlists track work either pending execution in the runqueue or executing. Waitqueues track work that is to pushed into the runqueue once the configured quota allows it.
If not shutdown explictly or via context management, the pool will self terminate when either the executor is garbage collected or the interpreter shuts down.
See concurrent.futures.Executor documentation for further reference.
- class Work[source]
Bases:
objectA work submission to process and execute.
- __init__(future: Future, fn: Callable[[...], Any], args: Tuple[Any, ...], kwargs: Dict[str, Any]) None
- future: Future
- class Worker[source]
Bases:
ThreadA worker in its own daemonized OS thread.
- __init__(*, min_workers: int | None = None, max_workers: int | None = None, submission_quota: int | None = None, submission_patience: float | None = None, max_idle_time: float | None = None, logger: Logger | None = None)[source]
Initializes the thread pool.
- Parameters:
min_workers – optional minimum number of workers in the pool, 0 by default.
max_workers – optional maximum number of workers in the pool, 32 times the number of available CPU threads by default (assuming I/O bound work).
submission_quota – optional maximum number of concurrent submissions for a a given callable. Up to the maximum number of workers by default. Useful when serving multiple users to prevent anyone from starving the rest.
submission_patience – optional time to wait in seconds for a worker to become available before upscaling the pool. 100 ms by default.
max_idle_time – optional time in seconds for a worker to wait for work before shutting itself down, effectively downscaling the pool. 60 seconds by default.
logger – optional user provided logger for the pool.
- Raises:
ValueError – if any argument is invalid.
RuntimeError – if the interpreter is shutting down.
- shutdown(wait: bool = True, *, cancel_futures: bool = False) None[source]
Shuts down the pool.
- Parameters:
wait – whether to wait for all worker threads to shutdown.
cancel_futures – whether to cancel all ongoing work (and associated futures).
- submit(fn: Callable[[...], Any], /, *args: Any, **kwargs: Any) Future[source]
Submits work to the pool.
- Parameters:
fn – a callable to execute. Must be immutable and hashable
quotas. (for the pool to track concurrent submissions and apply)
args – optional positional arguments to forward.
kwargs – optional keyword arguments to forward.
- Returns:
A future for the result of the work submitted.
- Raises:
RuntimeError – if the pool has been shutdown.
- wait(timeout: float | None = None) bool[source]
Waits for all work in the pool to complete.
Only ongoing work at the time of invocation is watched after. Work added during the wait will not be considered.
- Parameters:
timeout – optional timeout, in seconds, for the wait.
- Returns:
True if all work completed, False if the wait timed out.
- synchros2.executors.assign_coroutine(coroutine: Callable[[...], Awaitable], executor: rclpy.executors.Executor) Callable[[...], FutureLike][source]
Assign a coroutine to a given executor.
An assigned coroutine will return a future-like object that will be serviced by the associated executor.
- synchros2.executors.background(executor: rclpy.executors.Executor) Iterator[rclpy.executors.Executor][source]
Pushes an executor to a background thread.
Upon context entry, the executor starts spinning in a background thread. Upon context exit, the executor is shutdown and the background thread is joined.
- Parameters:
executor – executor to be managed.
- Returns:
a context manager.