synchros2.filters module
- class synchros2.filters.Adapter[source]
Bases:
FilterA message filter for data adaptation.
- __init__(upstream: Filter, fn: Callable, *, autostart: bool = True) None[source]
Initializes the adapter.
- Parameters:
upstream – the upstream message filter.
fn – a callable that takes messages as arguments and returns some
signaled (data to be)
returned (If none is)
occur. (no message signaling will)
autostart – whether to start filtering on instantiation or not.
- class synchros2.filters.ApproximateTimeSynchronizer[source]
Bases:
TimeSynchronizerBase[ApproximateTimeSynchronizer]A threadsafe message_filters.ApproximateTimeSynchronizer equivalent.
- __init__(upstreams: Sequence[Filter], *args: Any, autostart: bool = True, **kwargs: Any) None[source]
Initializes the ApproximateTimeSynchronizer instance.
- Parameters:
upstreams – message filters to be synchronized.
args – positional arguments to forward to message_filters.ApproximateTimeSynchronizer.
autostart – whether to start filtering on instantiation or not.
kwargs – keyword arguments to forward to message_filters.ApproximateTimeSynchronizer.
- class synchros2.filters.ExactTimeSynchronizer[source]
Bases:
TimeSynchronizerBase[TimeSynchronizer]A thread-safe message_filters.TimeSynchronizer equivalent.
- __init__(upstreams: Sequence[Filter], *args: Any, autostart: bool = True, **kwargs: Any) None[source]
Initializes the ExactTimeSynchronizer instance.
- Parameters:
upstreams – message filters to be synchronized.
args – positional arguments to forward to message_filters.TimeSynchronizer.
autostart – whether to start filtering on instantiation or not.
kwargs – keyword arguments to forward to message_filters.TimeSynchronizer.
- class synchros2.filters.Filter[source]
Bases:
SimpleFilterProtocolA threadsafe message_filters.SimpleFilter compliant message filter.
- __init__(autostart: bool = True) None[source]
Initialize filter.
- Parameters:
autostart – whether to start filtering on instantiation or not.
- registerCallback(fn: Callable, *args: Any) int[source]
Register callable to be called on filter output.
- Parameters:
fn – callback callable.
args – optional positional arguments to supply on call.
- Returns:
a unique connection identifier.
- Raises:
RuntimeError – if filter has been stopped.
- signalMessage(*messages: Any) None[source]
Feed one or more messages to the filter.
- Parameters:
messages – messages to be forwarded through the filter.
- Raises:
RuntimeError – if filter is not active
(either not started or already stopped). –
- start() None[source]
Start filtering.
- Raises:
RuntimeError – if filtering has been stopped already.
- stop() None[source]
Stop filtering.
- Raises:
RuntimeError – if filter has not been started.
- class synchros2.filters.SimpleFilterProtocol[source]
Bases:
ProtocolProtocol for message_filters.SimpleFilter subclasses.
- __init__(*args, **kwargs)
- class synchros2.filters.Subscriber[source]
Bases:
FilterA threadsafe message_filters.Subscriber equivalent.
- __init__(node: rclpy.node.Node, *args: Any, autostart: bool = True, **kwargs: Any) None[source]
Initializes the Subscriber instance.
- Parameters:
node – ROS 2 node to subscribe with.
args – positional arguments to forward to rclpy.node.Node.create_subscription.
autostart – whether to start filtering on instantiation or not.
kwargs – keyword arguments to forward to rclpy.node.Node.create_subscription.
- close() None
Stop filtering.
- Raises:
RuntimeError – if filter has not been started.
- class synchros2.filters.TimeSynchronizerBase[source]
Bases:
Filter,Generic[_TimeSynchronizerType]A base class for time synchronization filters.
- __init__(time_synchronizer_type: Type[message_filters.TimeSynchronizer], upstreams: Sequence[Filter], *args: Any, autostart: bool = True, **kwargs: Any) None[source]
Initializes the TimeSynchronizerBase instance.
- Parameters:
time_synchronizer_type – The type of the internal time synchronizer. Note this is the actual type not an instance. This type must be or inherit from message_filters.TimeSynchronizer
upstreams – message filters to be synchronized.
args – positional arguments to forward to the internal time synchronizer.
autostart – whether to start filtering on instantiation or not.
kwargs – keyword arguments to forward to internal time synchronizer.
- class synchros2.filters.TransformFilter[source]
Bases:
FilterA
tf2_rosdriven message filter, ensuring user defined transforms’ availability.This filter passes a stamped transform along with filtered messages, from message frame ID to the given target frame ID, looked up at the message timestamp. The message is assumed to have a header. When filtering message tuples, only the first message header is observed.
- __init__(upstream: Filter, target_frame_id: str, tf_buffer: tf2_ros.Buffer, tolerance_sec: float, logger: rclpy.impl.rcutils_logger.RcutilsLogger | None = None, *, autostart: bool = True) None[source]
Initializes the transform filter.
- Parameters:
upstream – the upstream message filter.
target_frame_id – the target frame ID for transforms.
tf_buffer – a buffer of transforms to look up.
tolerance_sec – a tolerance, in seconds, to wait for late transforms
messages. (before abandoning any waits and filtering out the corresponding)
logger – an optional logger to notify the yser about any errors during filtering.
autostart – whether to start filtering on instantiation or not.
- class synchros2.filters.Tunnel[source]
Bases:
FilterA message filter that simply forwards messages but can be detached.