synchros2.filters module

class synchros2.filters.Adapter[source]

Bases: Filter

A message filter for data adaptation.

__init__(upstream: Filter, fn: Callable, *, autostart: bool = True) None[source]

Initializes the adapter.

Parameters:
  • upstream – the upstream message filter.

  • fn – a callable that takes messages as arguments and returns some

  • signaled (data to be)

  • returned (If none is)

  • occur. (no message signaling will)

  • autostart – whether to start filtering on instantiation or not.

add(*messages: Any) None[source]

Add messages to the filter.

class synchros2.filters.ApproximateTimeSynchronizer[source]

Bases: TimeSynchronizerBase[ApproximateTimeSynchronizer]

A threadsafe message_filters.ApproximateTimeSynchronizer equivalent.

__init__(upstreams: Sequence[Filter], *args: Any, autostart: bool = True, **kwargs: Any) None[source]

Initializes the ApproximateTimeSynchronizer instance.

Parameters:
  • upstreams – message filters to be synchronized.

  • args – positional arguments to forward to message_filters.ApproximateTimeSynchronizer.

  • autostart – whether to start filtering on instantiation or not.

  • kwargs – keyword arguments to forward to message_filters.ApproximateTimeSynchronizer.

class synchros2.filters.ExactTimeSynchronizer[source]

Bases: TimeSynchronizerBase[TimeSynchronizer]

A thread-safe message_filters.TimeSynchronizer equivalent.

__init__(upstreams: Sequence[Filter], *args: Any, autostart: bool = True, **kwargs: Any) None[source]

Initializes the ExactTimeSynchronizer instance.

Parameters:
  • upstreams – message filters to be synchronized.

  • args – positional arguments to forward to message_filters.TimeSynchronizer.

  • autostart – whether to start filtering on instantiation or not.

  • kwargs – keyword arguments to forward to message_filters.TimeSynchronizer.

class synchros2.filters.Filter[source]

Bases: SimpleFilterProtocol

A threadsafe message_filters.SimpleFilter compliant message filter.

__init__(autostart: bool = True) None[source]

Initialize filter.

Parameters:

autostart – whether to start filtering on instantiation or not.

registerCallback(fn: Callable, *args: Any) int[source]

Register callable to be called on filter output.

Parameters:
  • fn – callback callable.

  • args – optional positional arguments to supply on call.

Returns:

a unique connection identifier.

Raises:

RuntimeError – if filter has been stopped.

signalMessage(*messages: Any) None[source]

Feed one or more messages to the filter.

Parameters:

messages – messages to be forwarded through the filter.

Raises:
  • RuntimeError – if filter is not active

  • (either not started or already stopped).

start() None[source]

Start filtering.

Raises:

RuntimeError – if filtering has been stopped already.

stop() None[source]

Stop filtering.

Raises:

RuntimeError – if filter has not been started.

unregisterCallback(connection: int) None[source]

Unregister a callback.

Parameters:

connection – unique identifier for the callback.

class synchros2.filters.SimpleFilterProtocol[source]

Bases: Protocol

Protocol for message_filters.SimpleFilter subclasses.

__init__(*args, **kwargs)
registerCallback(callback: Callable, *args: Any) int[source]

Register callable to be called on filter output.

signalMessage(*messages: Any) None[source]

Feed one or more messages to the filter.

class synchros2.filters.Subscriber[source]

Bases: Filter

A threadsafe message_filters.Subscriber equivalent.

__init__(node: rclpy.node.Node, *args: Any, autostart: bool = True, **kwargs: Any) None[source]

Initializes the Subscriber instance.

Parameters:
  • node – ROS 2 node to subscribe with.

  • args – positional arguments to forward to rclpy.node.Node.create_subscription.

  • autostart – whether to start filtering on instantiation or not.

  • kwargs – keyword arguments to forward to rclpy.node.Node.create_subscription.

close() None

Stop filtering.

Raises:

RuntimeError – if filter has not been started.

class synchros2.filters.TimeSynchronizerBase[source]

Bases: Filter, Generic[_TimeSynchronizerType]

A base class for time synchronization filters.

__init__(time_synchronizer_type: Type[message_filters.TimeSynchronizer], upstreams: Sequence[Filter], *args: Any, autostart: bool = True, **kwargs: Any) None[source]

Initializes the TimeSynchronizerBase instance.

Parameters:
  • time_synchronizer_type – The type of the internal time synchronizer. Note this is the actual type not an instance. This type must be or inherit from message_filters.TimeSynchronizer

  • upstreams – message filters to be synchronized.

  • args – positional arguments to forward to the internal time synchronizer.

  • autostart – whether to start filtering on instantiation or not.

  • kwargs – keyword arguments to forward to internal time synchronizer.

property upstreams: List[Filter]

Returns a list of the message filters to be synchronized

class synchros2.filters.TransformFilter[source]

Bases: Filter

A tf2_ros driven message filter, ensuring user defined transforms’ availability.

This filter passes a stamped transform along with filtered messages, from message frame ID to the given target frame ID, looked up at the message timestamp. The message is assumed to have a header. When filtering message tuples, only the first message header is observed.

__init__(upstream: Filter, target_frame_id: str, tf_buffer: tf2_ros.Buffer, tolerance_sec: float, logger: rclpy.impl.rcutils_logger.RcutilsLogger | None = None, *, autostart: bool = True) None[source]

Initializes the transform filter.

Parameters:
  • upstream – the upstream message filter.

  • target_frame_id – the target frame ID for transforms.

  • tf_buffer – a buffer of transforms to look up.

  • tolerance_sec – a tolerance, in seconds, to wait for late transforms

  • messages. (before abandoning any waits and filtering out the corresponding)

  • logger – an optional logger to notify the yser about any errors during filtering.

  • autostart – whether to start filtering on instantiation or not.

add(*messages: Any) None[source]

Add messages to the filter.

class synchros2.filters.Tunnel[source]

Bases: Filter

A message filter that simply forwards messages but can be detached.

__init__(upstream: Filter, *, autostart: bool = True) None[source]

Initializes the tunnel.

Parameters:
  • upstream – the upstream message filter.

  • autostart – whether to start filtering on instantiation or not.

close() None[source]

Closes the tunnel, simply disconnecting it from upstream.