Source code for synchros2.futures

# Copyright (c) 2023 Robotics and AI Institute LLC dba RAI Institute.  All rights reserved.
from threading import Event
from typing import Any, Awaitable, Callable, Optional, Protocol, TypeVar, Union, runtime_checkable

from rclpy.clock import Clock
from rclpy.context import Context
from rclpy.utilities import get_default_context

from synchros2.clock import wait_for

T = TypeVar("T", covariant=True)


[docs] class FutureLike(Awaitable[T], Protocol[T]): """A future-like awaitable object. Matches `rclpy.task.Future` and `concurrent.futures.Future` protocols. """
[docs] def result(self) -> T: """Get future result (may block).""" ...
[docs] def exception(self) -> Optional[Exception]: """Get future exception, if any.""" ...
[docs] def done(self) -> bool: """Check if future is ready.""" ...
[docs] def add_done_callback(self, func: Callable[["FutureLike[T]"], None]) -> None: """Add a callback to be scheduled as soon as the future is ready.""" ...
[docs] def cancel(self) -> None: """Cancel future.""" ...
[docs] def cancelled(self) -> bool: """Check if future was cancelled.""" ...
[docs] @runtime_checkable class FutureConvertible(Awaitable[T], Protocol[T]): """An awaitable that is convertible to a future-like object."""
[docs] def as_future(self) -> FutureLike[T]: """Get future-like view.""" ...
AnyFuture = Union[FutureLike, FutureConvertible]
[docs] def as_proper_future(instance: AnyFuture) -> FutureLike: """Return `instance` as a proper future-like object.""" if isinstance(instance, FutureConvertible): return instance.as_future() return instance
[docs] def wait_for_future( future: AnyFuture, timeout_sec: Optional[float] = None, *, clock: Optional[Clock] = None, context: Optional[Context] = None, ) -> bool: """Block while waiting for a future to become done Args: future (Future): The future to be waited on timeout_sec (Optional[float]): An optional timeout for how long to wait clock (Optional[Clock]): An optional clock to use for timeout waits, defaults to the clock of the current scope if any, otherwise the system clock context (Optional[Context]): Current context (will use the default if none is given) Returns: bool: True if successful, False if the timeout was triggered """ if context is None: context = get_default_context() if clock is None: import synchros2.scope clock = synchros2.scope.clock() event = Event() context.on_shutdown(event.set) proper_future = as_proper_future(future) proper_future.add_done_callback(lambda _: event.set()) if proper_future.cancelled(): event.set() wait_for(event, clock=clock, timeout_sec=timeout_sec) return proper_future.done()
[docs] def unwrap_future( future: AnyFuture, timeout_sec: Optional[float] = None, *, clock: Optional[Clock] = None, context: Optional[Context] = None, ) -> Any: """Fetch future result when it is done. Note this function may block and may raise if the future does or it times out waiting for it. See wait_for_future() documentation for further reference on arguments taken. """ proper_future = as_proper_future(future) if not wait_for_future(proper_future, timeout_sec, clock=clock, context=context): raise ValueError("cannot unwrap future that is not done") return proper_future.result()
wait_and_return_result = unwrap_future """Fetch future result when it is done. Alias for unwrap_future()."""