synchros2.feeds module
- class synchros2.feeds.AdaptedMessageFeed[source]
Bases:
MessageFeed[MessageT]A message feed decorator to simplify adapter patterns.
- __init__(feed: MessageFeed, fn: Callable[[...], MessageT], *, autostart: bool = True, **kwargs: Any) None[source]
Initializes the message feed.
- Parameters:
feed – the upstream (ie. decorated) message feed.
fn – message adapting callable.
kwargs – all other keyword arguments are forwarded
initialization. (for MessageFeed)
autostart – whether to start feeding messages immediately or not.
kwargs – key word arguments to pass to the message feed
- property feed: MessageFeed
Gets the upstream message feed.
- class synchros2.feeds.ExactSynchronizedMessageFeed[source]
Bases:
SynchronizedMessageFeedBaseA message feeds’ aggregator using a message_filters.TimeSynchronizer instance.
- __init__(*feeds: MessageFeed, queue_size: int = 10, history_length: int | None = None, node: rclpy.node.Node | None = None, autostart: bool = True, **kwargs: Any) None[source]
Initializes the message feed.
- Parameters:
feeds – upstream message feeds to be synchronized.
queue_size – the message queue size for synchronization.
history_length – optional historic data size, defaults to 1.
node – optional node for the underlying native subscription, defaults to
node. (the current process)
autostart – whether to start feeding messages immediately or not.
kwargs – key word arguments to pass to the message feed
- class synchros2.feeds.FramedMessageFeed[source]
Bases:
MessageFeed[MessageT]A message feed decorator, incorporating transforms using a TransformFilter instance.
- __init__(feed: MessageFeed, target_frame_id: str, *, tolerance_sec: float = 1.0, tf_buffer: tf2_ros.Buffer | None = None, history_length: int | None = None, node: rclpy.node.Node | None = None, autostart: bool = True, **kwargs: Any) None[source]
Initializes the message feed.
- Parameters:
feed – the upstream message feed.
target_frame_id – the target frame ID for transforms.
tf_buffer – optional buffer of transforms to look up. If none is provided
instantiated. (a transforms' buffer and a listener will be)
tolerance_sec – optional tolerance, in seconds, to wait for late transforms.
history_length – optional historic data size, defaults to 1.
node – optional node for the underlying native subscription, defaults to
node. (the current process)
autostart – whether to start feeding messages immediately or not.
kwargs – key word arguments to pass to the message feed
- property feed: MessageFeed[MessageT]
Gets the upstream message feed.
- class synchros2.feeds.MessageFeed[source]
Bases:
Generic[MessageT]An ergonomic wrapper for generic message filters.
- __init__(link: Filter, *, history_length: int | None = None, node: rclpy.node.Node | None = None, trace: bool = False) None[source]
Initializes the message feed.
- Parameters:
link – Wrapped message filter, connecting this message feed with its source.
history_length – optional historic data size, defaults to 1
node – optional node for lifetime control, defaults to the current process node.
trace – Whether to log when messages are received by the message feed
- property latest_update: FutureLike[MessageT]
Gets the future to the latest message, which may not have been received yet.
- matching_update(matching_predicate: Callable[[MessageT], bool]) FutureLike[MessageT][source]
Gets a future to the next matching message yet to be received.
- Parameters:
matching_predicate – a boolean predicate to match incoming messages.
- Returns:
a future.
- recall(callback: Callable[[MessageT], None]) Tunnel[source]
Adds a callback for message recalling.
- Returns:
the underlying connection, which can be closed to stop future callbacks.
- stream(*, forward_only: bool = False, expunge: bool = False, buffer_size: int | None = None, duration_sec: float | None = None, timeout_sec: float | None = None) Generator[MessageT, None, None][source]
- stream(*, greedy: Literal[True], forward_only: bool = False, expunge: bool = False, buffer_size: int | None = None, duration_sec: float | None = None, timeout_sec: float | None = None) Generator[List[MessageT], None, None]
Iterates over messages as they come.
Iteration stops when the given timeout expires or when the associated context is shutdown. Note that iterating over the message stream is a blocking operation.
- Parameters:
greedy – if true, greedily batch messages as it becomes available.
forward_only – whether to ignore previosuly received messages.
expunge – if true, wipe out the message history after reading
applies (if it)
buffer_size – optional maximum size for the incoming messages buffer.
provided (If none is)
unbounded. (the buffer will be grow)
duration_sec – optional duration, in seconds, to collect messages from the feed.
timeout_sec – optional timeout, in seconds, for a new message to be received.
- Returns:
a lazy iterator over messages, one message at a time or in batches if greedy.
- Raises:
TimeoutError – if streaming times out waiting for a new message.
- property update: FutureLike[MessageT]
Gets the future to the next message yet to be received.
- class synchros2.feeds.SynchronizedMessageFeed[source]
Bases:
SynchronizedMessageFeedBaseA message feeds’ aggregator using a message_filters.ApproximateTimeSynchronizer instance.
- __init__(*feeds: MessageFeed, queue_size: int = 10, delay: float = 0.2, allow_headerless: bool = False, history_length: int | None = None, node: rclpy.node.Node | None = None, autostart: bool = True, **kwargs: Any) None[source]
Initializes the message feed.
- Parameters:
feeds – upstream message feeds to be synchronized.
queue_size – the message queue size for synchronization.
delay – the maximum delay, in seconds, between messages for synchronization to succeed.
allow_headerless – whether it’s OK for there to be no header in the messages (in which
case
used). (the ROS time of arrival will be)
history_length – optional historic data size, defaults to 1.
node – optional node for the underlying native subscription, defaults to
node. (the current process)
autostart – whether to start feeding messages immediately or not.
kwargs – key word arguments to pass to the message feed
- class synchros2.feeds.SynchronizedMessageFeedBase[source]
Bases:
MessageFeedA base class for message feeds’ aggregators.
- __init__(time_synchronizer_filter: TimeSynchronizerBase, *feeds: MessageFeed, history_length: int | None = None, node: rclpy.node.Node | None = None, **kwargs: Any) None[source]
Initializes the message feed.
- Parameters:
time_synchronizer_filter – This should be one of synchros2.filters.ApproximateTimeSynchronizer or synchros2.filters.ExactTimeSynchronizer
feeds – upstream message feeds to be synchronized.
history_length – optional historic data size, defaults to 1.
node – optional node for the underlying native subscription, defaults to
node. (the current process)
kwargs – key word arguments to pass to the message feed
- property feeds: Iterable[MessageFeed]
Gets all aggregated message feeds.