synchros2.utilities module
- class synchros2.utilities.Tape[source]
Bases:
Generic[T]A thread-safe data tape that can be written and iterated safely.
- class Stream[source]
Bases:
Generic[U]A synchronized data stream.
- __init__(max_size: int | None = None, label: str | None = None) None[source]
Initializes the stream.
- Parameters:
max_size – optional maximum stream size. Must be a positive number.
label – optional label for the stream (useful in debug and error messages).
- read(timeout_sec: float | None = None) U | None[source]
Read data from the stream.
- Parameters:
timeout_sec – optional read timeout, in seconds.
- Returns:
data if the read is successful, and
Noneif the stream is interrupted.- Raises:
TImeoutError if the read times out. –
- __init__(max_length: int | None = None) None[source]
Initializes the data tape.
- Parameters:
max_length – optional maximum tape length.
- add_write_callback(callback: Callable[[T], None], forward_only: bool = False) None[source]
Adds a callback to be invoked on each write.
- Parameters:
callback – a callable taking the written data as its only argument.
forward_only – if true, ignore existing content and call back on future writes.
- content(*, follow: bool = False, forward_only: bool = False, expunge: bool = False, buffer_size: int | None = None, duration_sec: float | None = None, timeout_sec: float | None = None, label: str | None = None) Generator[T, None, None][source]
- content(*, greedy: Literal[True], follow: Literal[True], forward_only: bool = False, expunge: bool = False, buffer_size: int | None = None, duration_sec: float | None = None, timeout_sec: float | None = None, label: str | None = None) Generator[List[T], None, None]
- content(*, greedy: Literal[True], expunge: bool = False, buffer_size: int | None = None, duration_sec: float | None = None, timeout_sec: float | None = None, label: str | None = None) List[T]
Iterate over the data tape.
When following the data tape, iteration stops when the given timeout expires and when the data tape is closed.
- Parameters:
greedy – if true, greedily batch content as it becomes available.
follow – whether to follow the data tape as it gets written or not.
forward_only – if true, ignore existing content and only look ahead
tape. (when following the data)
expunge – if true, wipe out existing content in the data tape after
applies (reading if it)
buffer_size – optional buffer size when following the data tape.
provided (If none is)
necessary. (the buffer will grow as)
duration_sec – optional duration, in seconds, to gather content from the data tape.
timeout_sec – optional timeout, in seconds, when following the data tape.
label – optional label to qualify logs and warnings.
- Returns:
a lazy iterator over the data tape, one item at a time or in batches if greedy.
- Raises:
TimeoutError – if iteration times out waiting for new data.
- future_matching_write(matching_predicate: Callable[[T], bool]) FutureLike[T][source]
Gets a future to the next matching data yet to be written.
- Parameters:
matching_predicate – a boolean predicate to match written data.
- Returns:
a future.
- property future_write: FutureLike[T]
Gets the future to the next data yet to be written.
- property latest_write: FutureLike[T]
Gets the future to the latest data written or to be written.
- synchros2.utilities.bind_to_thread(callable_: Callable, thread: Thread) Callable[source]
Binds a callable to a thread, so it can only be invoked from that thread.
- synchros2.utilities.cap(func: Callable, num_times: int, fill_value: Any = None) Callable[source]
Decorates a callable to cap invocations to a prescribed number of times.
- Parameters:
func – callable to be decorated.
num_times – maximum number of times the callable may be invoked.
fill_value – optional value to return once invocations reach their cap.
- Returns:
decorated callable.
- synchros2.utilities.either_or(obj: Any, name: str, default: Any) Any[source]
Gets either an object attribute’s value or a default value.
Unlike getattr, callable attributes are applied as getters on obj.
- synchros2.utilities.fqn(obj: Any) str | None[source]
Computes the fully qualified name of a given object, if any.
- synchros2.utilities.functional_decorator(base_decorator: Callable) Callable[source]
Wraps a decorating callable to be usable as a Python decorator for functions.
As an example, consider the following decorator example:
@functional_decorator def my_decorator(func, some_flag=None): ...
This decorator can then be used like this:
@my_decorator def my_function(*args): ...
and also like this:
@my_decorator(some_flag=True) def my_function(*args): ...
- synchros2.utilities.localized_error_message(user_message: str | None = None) str[source]
Returns an error message with source location information.
- synchros2.utilities.namespace_with(*args: str | None) str[source]
Puts together a ROS 2 like namespace from its constitutive parts.
- synchros2.utilities.skip(func: Callable, num_times: int, fill_value: Any = None) Callable[source]
Decorates a callable to skip the first few invocations a prescribed number of times.
- Parameters:
func – callable to be decorated.
num_times – number of times to skip the invocation.
fill_value – optional value to return for skipped invocations.
- Returns:
decorated callable.
- synchros2.utilities.synchronized(func: Callable | None = None, lock: allocate_lock | None = None) Callable[source]
Wraps func to synchronize invocations, optionally taking a user defined lock.
This function can be used as a decorator, like:
@synchronized def my_function(...): ...
or:
@synchronized(lock=my_lock) def my_function(...): ...
- synchros2.utilities.take_kwargs(func: Callable, kwargs: Mapping) Tuple[Mapping, Mapping][source]
Take keyword arguments given a callable’s signature.
- Parameters:
func – callable to take keyword arguments for.
kwargs – mapping to take keyword arguments from.
- Returns:
a tuple of taken and dropped keyword arguments.
- synchros2.utilities.throttle(func: Callable, min_period: rclpy.duration.Duration, time_source: rclpy.clock.Clock | None = None, fill_value: Any = None) Callable[source]
Decorates a callable to throttle invocations.
- Parameters:
func – callable to be decorated.
min_period – minimum time between consecutive invocations.
time_source – optional time source to measure time against.
provided (If none is)
used. (the system clock will be)
fill_value – optional value to return for throttled invocations.
- Returns:
decorated callable.