synchros2.feeds module

class synchros2.feeds.AdaptedMessageFeed[source]

Bases: MessageFeed[MessageT]

A message feed decorator to simplify adapter patterns.

__init__(feed: MessageFeed, fn: Callable[[...], MessageT], *, autostart: bool = True, **kwargs: Any) None[source]

Initializes the message feed.

Parameters:
  • feed – the upstream (ie. decorated) message feed.

  • fn – message adapting callable.

  • kwargs – all other keyword arguments are forwarded

  • initialization. (for MessageFeed)

  • autostart – whether to start feeding messages immediately or not.

  • kwargs – key word arguments to pass to the message feed

property feed: MessageFeed

Gets the upstream message feed.

stop() None[source]

Stop this message feed and the upstream one as well.

class synchros2.feeds.ExactSynchronizedMessageFeed[source]

Bases: SynchronizedMessageFeedBase

A message feeds’ aggregator using a message_filters.TimeSynchronizer instance.

__init__(*feeds: MessageFeed, queue_size: int = 10, history_length: int | None = None, node: rclpy.node.Node | None = None, autostart: bool = True, **kwargs: Any) None[source]

Initializes the message feed.

Parameters:
  • feeds – upstream message feeds to be synchronized.

  • queue_size – the message queue size for synchronization.

  • history_length – optional historic data size, defaults to 1.

  • node – optional node for the underlying native subscription, defaults to

  • node. (the current process)

  • autostart – whether to start feeding messages immediately or not.

  • kwargs – key word arguments to pass to the message feed

class synchros2.feeds.FramedMessageFeed[source]

Bases: MessageFeed[MessageT]

A message feed decorator, incorporating transforms using a TransformFilter instance.

__init__(feed: MessageFeed, target_frame_id: str, *, tolerance_sec: float = 1.0, tf_buffer: tf2_ros.Buffer | None = None, history_length: int | None = None, node: rclpy.node.Node | None = None, autostart: bool = True, **kwargs: Any) None[source]

Initializes the message feed.

Parameters:
  • feed – the upstream message feed.

  • target_frame_id – the target frame ID for transforms.

  • tf_buffer – optional buffer of transforms to look up. If none is provided

  • instantiated. (a transforms' buffer and a listener will be)

  • tolerance_sec – optional tolerance, in seconds, to wait for late transforms.

  • history_length – optional historic data size, defaults to 1.

  • node – optional node for the underlying native subscription, defaults to

  • node. (the current process)

  • autostart – whether to start feeding messages immediately or not.

  • kwargs – key word arguments to pass to the message feed

property feed: MessageFeed[MessageT]

Gets the upstream message feed.

stop() None[source]

Stop this message feed and the upstream one as well.

class synchros2.feeds.MessageFeed[source]

Bases: Generic[MessageT]

An ergonomic wrapper for generic message filters.

__init__(link: Filter, *, history_length: int | None = None, node: rclpy.node.Node | None = None, trace: bool = False) None[source]

Initializes the message feed.

Parameters:
  • link – Wrapped message filter, connecting this message feed with its source.

  • history_length – optional historic data size, defaults to 1

  • node – optional node for lifetime control, defaults to the current process node.

  • trace – Whether to log when messages are received by the message feed

close() None

Stop the message feed.

flush() None[source]

Flush the message feed.

property history: List[MessageT]

Gets the entire history of messages received so far.

property latest: MessageT | None

Gets the latest message received, if any.

property latest_update: FutureLike[MessageT]

Gets the future to the latest message, which may not have been received yet.

Gets the underlying message connection.

matching_update(matching_predicate: Callable[[MessageT], bool]) FutureLike[MessageT][source]

Gets a future to the next matching message yet to be received.

Parameters:

matching_predicate – a boolean predicate to match incoming messages.

Returns:

a future.

recall(callback: Callable[[MessageT], None]) Tunnel[source]

Adds a callback for message recalling.

Returns:

the underlying connection, which can be closed to stop future callbacks.

start() None[source]

Start the message feed.

stop() None[source]

Stop the message feed.

stream(*, forward_only: bool = False, expunge: bool = False, buffer_size: int | None = None, duration_sec: float | None = None, timeout_sec: float | None = None) Generator[MessageT, None, None][source]
stream(*, greedy: Literal[True], forward_only: bool = False, expunge: bool = False, buffer_size: int | None = None, duration_sec: float | None = None, timeout_sec: float | None = None) Generator[List[MessageT], None, None]

Iterates over messages as they come.

Iteration stops when the given timeout expires or when the associated context is shutdown. Note that iterating over the message stream is a blocking operation.

Parameters:
  • greedy – if true, greedily batch messages as it becomes available.

  • forward_only – whether to ignore previosuly received messages.

  • expunge – if true, wipe out the message history after reading

  • applies (if it)

  • buffer_size – optional maximum size for the incoming messages buffer.

  • provided (If none is)

  • unbounded. (the buffer will be grow)

  • duration_sec – optional duration, in seconds, to collect messages from the feed.

  • timeout_sec – optional timeout, in seconds, for a new message to be received.

Returns:

a lazy iterator over messages, one message at a time or in batches if greedy.

Raises:

TimeoutError – if streaming times out waiting for a new message.

property update: FutureLike[MessageT]

Gets the future to the next message yet to be received.

class synchros2.feeds.SynchronizedMessageFeed[source]

Bases: SynchronizedMessageFeedBase

A message feeds’ aggregator using a message_filters.ApproximateTimeSynchronizer instance.

__init__(*feeds: MessageFeed, queue_size: int = 10, delay: float = 0.2, allow_headerless: bool = False, history_length: int | None = None, node: rclpy.node.Node | None = None, autostart: bool = True, **kwargs: Any) None[source]

Initializes the message feed.

Parameters:
  • feeds – upstream message feeds to be synchronized.

  • queue_size – the message queue size for synchronization.

  • delay – the maximum delay, in seconds, between messages for synchronization to succeed.

  • allow_headerless – whether it’s OK for there to be no header in the messages (in which

  • case

  • used). (the ROS time of arrival will be)

  • history_length – optional historic data size, defaults to 1.

  • node – optional node for the underlying native subscription, defaults to

  • node. (the current process)

  • autostart – whether to start feeding messages immediately or not.

  • kwargs – key word arguments to pass to the message feed

class synchros2.feeds.SynchronizedMessageFeedBase[source]

Bases: MessageFeed

A base class for message feeds’ aggregators.

__init__(time_synchronizer_filter: TimeSynchronizerBase, *feeds: MessageFeed, history_length: int | None = None, node: rclpy.node.Node | None = None, **kwargs: Any) None[source]

Initializes the message feed.

Parameters:
  • time_synchronizer_filter – This should be one of synchros2.filters.ApproximateTimeSynchronizer or synchros2.filters.ExactTimeSynchronizer

  • feeds – upstream message feeds to be synchronized.

  • history_length – optional historic data size, defaults to 1.

  • node – optional node for the underlying native subscription, defaults to

  • node. (the current process)

  • kwargs – key word arguments to pass to the message feed

property feeds: Iterable[MessageFeed]

Gets all aggregated message feeds.

stop() None[source]

Stop this message feed and all upstream ones as well.