synchros2.utilities module

class synchros2.utilities.Tape[source]

Bases: Generic[T]

A thread-safe data tape that can be written and iterated safely.

class Stream[source]

Bases: Generic[U]

A synchronized data stream.

__init__(max_size: int | None = None, label: str | None = None) None[source]

Initializes the stream.

Parameters:
  • max_size – optional maximum stream size. Must be a positive number.

  • label – optional label for the stream (useful in debug and error messages).

property consumed: bool

Check if all stream data has been consumed.

interrupt() None[source]

Interrupt the stream and wake up the reader.

property label: str | None

Get stream label.

read(timeout_sec: float | None = None) U | None[source]

Read data from the stream.

Parameters:

timeout_sec – optional read timeout, in seconds.

Returns:

data if the read is successful, and None if the stream is interrupted.

Raises:

TImeoutError if the read times out.

try_read() U | None[source]

Try to read data from the stream.

Returns:

data if the read is successful, and None if there is nothing to be read or the stream is interrupted.

write(data: U) bool[source]

Write data to the stream.

Returns:

True if the write operation succeeded, False if the stream has grown to its specified maximum size and the write operation cannot be carried out.

__init__(max_length: int | None = None) None[source]

Initializes the data tape.

Parameters:

max_length – optional maximum tape length.

add_write_callback(callback: Callable[[T], None], forward_only: bool = False) None[source]

Adds a callback to be invoked on each write.

Parameters:
  • callback – a callable taking the written data as its only argument.

  • forward_only – if true, ignore existing content and call back on future writes.

close() None[source]

Close the data tape.

This will interrupt all following content iterators.

content(*, follow: bool = False, forward_only: bool = False, expunge: bool = False, buffer_size: int | None = None, duration_sec: float | None = None, timeout_sec: float | None = None, label: str | None = None) Generator[T, None, None][source]
content(*, greedy: Literal[True], follow: Literal[True], forward_only: bool = False, expunge: bool = False, buffer_size: int | None = None, duration_sec: float | None = None, timeout_sec: float | None = None, label: str | None = None) Generator[List[T], None, None]
content(*, greedy: Literal[True], expunge: bool = False, buffer_size: int | None = None, duration_sec: float | None = None, timeout_sec: float | None = None, label: str | None = None) List[T]

Iterate over the data tape.

When following the data tape, iteration stops when the given timeout expires and when the data tape is closed.

Parameters:
  • greedy – if true, greedily batch content as it becomes available.

  • follow – whether to follow the data tape as it gets written or not.

  • forward_only – if true, ignore existing content and only look ahead

  • tape. (when following the data)

  • expunge – if true, wipe out existing content in the data tape after

  • applies (reading if it)

  • buffer_size – optional buffer size when following the data tape.

  • provided (If none is)

  • necessary. (the buffer will grow as)

  • duration_sec – optional duration, in seconds, to gather content from the data tape.

  • timeout_sec – optional timeout, in seconds, when following the data tape.

  • label – optional label to qualify logs and warnings.

Returns:

a lazy iterator over the data tape, one item at a time or in batches if greedy.

Raises:

TimeoutError – if iteration times out waiting for new data.

flush() None[source]

Flushes the data tape content, if any.

future_matching_write(matching_predicate: Callable[[T], bool]) FutureLike[T][source]

Gets a future to the next matching data yet to be written.

Parameters:

matching_predicate – a boolean predicate to match written data.

Returns:

a future.

property future_write: FutureLike[T]

Gets the future to the next data yet to be written.

property head: T | None

Returns the data tape head, if any.

property latest_write: FutureLike[T]

Gets the future to the latest data written or to be written.

write(data: T) bool[source]

Write the data tape.

synchros2.utilities.bind_to_thread(callable_: Callable, thread: Thread) Callable[source]

Binds a callable to a thread, so it can only be invoked from that thread.

synchros2.utilities.cap(func: Callable, num_times: int, fill_value: Any = None) Callable[source]

Decorates a callable to cap invocations to a prescribed number of times.

Parameters:
  • func – callable to be decorated.

  • num_times – maximum number of times the callable may be invoked.

  • fill_value – optional value to return once invocations reach their cap.

Returns:

decorated callable.

synchros2.utilities.either_or(obj: Any, name: str, default: Any) Any[source]

Gets either an object attribute’s value or a default value.

Unlike getattr, callable attributes are applied as getters on obj.

synchros2.utilities.ensure(value: T | None) T[source]

Ensures value is not None or fails trying.

synchros2.utilities.fqn(obj: Any) str | None[source]

Computes the fully qualified name of a given object, if any.

synchros2.utilities.functional_decorator(base_decorator: Callable) Callable[source]

Wraps a decorating callable to be usable as a Python decorator for functions.

As an example, consider the following decorator example:

@functional_decorator
def my_decorator(func, some_flag=None):
    ...

This decorator can then be used like this:

@my_decorator
def my_function(*args):
    ...

and also like this:

@my_decorator(some_flag=True)
def my_function(*args):
    ...
synchros2.utilities.localized_error_message(user_message: str | None = None) str[source]

Returns an error message with source location information.

synchros2.utilities.namespace_with(*args: str | None) str[source]

Puts together a ROS 2 like namespace from its constitutive parts.

synchros2.utilities.skip(func: Callable, num_times: int, fill_value: Any = None) Callable[source]

Decorates a callable to skip the first few invocations a prescribed number of times.

Parameters:
  • func – callable to be decorated.

  • num_times – number of times to skip the invocation.

  • fill_value – optional value to return for skipped invocations.

Returns:

decorated callable.

synchros2.utilities.synchronized(func: Callable | None = None, lock: allocate_lock | None = None) Callable[source]

Wraps func to synchronize invocations, optionally taking a user defined lock.

This function can be used as a decorator, like:

@synchronized
def my_function(...):
    ...

or:

@synchronized(lock=my_lock)
def my_function(...):
    ...
synchros2.utilities.take_kwargs(func: Callable, kwargs: Mapping) Tuple[Mapping, Mapping][source]

Take keyword arguments given a callable’s signature.

Parameters:
  • func – callable to take keyword arguments for.

  • kwargs – mapping to take keyword arguments from.

Returns:

a tuple of taken and dropped keyword arguments.

synchros2.utilities.throttle(func: Callable, min_period: rclpy.duration.Duration, time_source: rclpy.clock.Clock | None = None, fill_value: Any = None) Callable[source]

Decorates a callable to throttle invocations.

Parameters:
  • func – callable to be decorated.

  • min_period – minimum time between consecutive invocations.

  • time_source – optional time source to measure time against.

  • provided (If none is)

  • used. (the system clock will be)

  • fill_value – optional value to return for throttled invocations.

Returns:

decorated callable.