synchros2.tf_listener_wrapper module

class synchros2.tf_listener_wrapper.StampLike[source]

Bases: Protocol

A protocol similar to the Time msg

__init__(*args, **kwargs)
nanosec: int
sec: int
class synchros2.tf_listener_wrapper.TFListenerWrapper[source]

Bases: object

A tf2_ros lookup device, wrapping both a buffer and a listener.

When using process-wide machinery:

tf_listener = TFListenerWrapper()
tf_listener.wait_for_a_tform_b(target_frame, source_frame)
tf_listener.lookup_a_tform_b(target_frame, source_frame)

When composed by a ROS 2 node:

class MyNode(Node):

    def __init__(self, **kwargs: Any) -> None:
        super().__init__("my_node", **kwargs)
        self.tf_listener = TFListenerWrapper(self)
        # do not wait synchronously here or it will block forever!
        # ...

    def callback(self):
        if tf_listener.wait_for_a_tform_b(target_frame, source_frame, timeout_sec=0.0):
            self.tf_listener.lookup_a_tform_b(target_frame, source_frame)
            # or you can lookup and handle exceptions yourself
__init__(node: rclpy.node.Node | None = None, cache_time_s: float | None = None) None[source]

Initializes the wrapper.

Parameters:
  • node – optional node for transform listening, defaults to the current scope node.

  • cache_time_s – optional transform buffer size, in seconds.

property buffer: tf2_ros.buffer.Buffer

Returns the tf buffer

lookup_a_tform_b(frame_a: str, frame_b: str, transform_time: StampLike | TimeLike | None = None, timeout_sec: float | None = None, wait_for_frames: bool = False) geometry_msgs.msg.TransformStamped[source]

Looks up the transform from frame_a to frame_b at the specified time.

Parameters:
  • frame_a – Base frame for transform. The transform returned will be frame_a_t_frame_b

  • frame_b – Tip frame for transform. The transform returned will be frame_a_t_frame_b

  • transform_time – The time at which to look up the transform. If left at None, the most recent transform available will used.

  • timeout_sec – The time to wait for the transform to become available if the requested time is beyond the most recent transform in the buffer. If set to 0, it will not wait. If left at None, it will wait indefinitely.

  • wait_for_frames – If true, it will wait for a path to exist from frame_a to frame_b in the buffer. If false, lookup will fail immediately if a path between frames does not exist, regardless of what timeout was set. Note that wait_for_a_tform_b can also be used to wait for a transform to become available.

Returns:

The transform frame_a_t_frame_b at the time specified.

Raises:

All the possible TransformExceptions.

lookup_latest_timestamp(frame_a: str, frame_b: str, timeout_sec: float | None = None, wait_for_frames: bool = False) rclpy.time.Time[source]

Looks up the latest time at which a transform from frame_a to frame_b is available.

Parameters:
  • frame_a – Base frame for transform. The transform returned will be frame_a_t_frame_b

  • frame_b – Tip frame for transform. The transform returned will be frame_a_t_frame_b

  • timeout_sec – The time to wait for the transform to become available if the requested time is beyond the most recent transform in the buffer. If set to 0, it will not wait. If left at None, it will wait indefinitely.

  • wait_for_frames – If true, it will wait for a path to exist from frame_a to frame_b in the buffer. If false, lookup will fail immediately if a path between frames does not exist, regardless of what timeout was set. Note that wait_for_a_tform_b can also be used to wait for a transform to become available.

Returns:

The timestamp from the latest recorded transform frame_a_t_frame_b

Raises:

All the possible TransformExceptions.

shutdown() None[source]

Shutdown the tf listener

wait_for_a_tform_b(frame_a: str, frame_b: str, transform_time: StampLike | TimeLike | None = None, timeout_sec: float | None = None) bool[source]

Wait for a transform from frame_a to frame_b to become available.

Note this is a blocking call. If the underlying node is not spinning, an indefinite wait may block forever.

Parameters:
  • frame_a – Base frame for transform. The transform returned will be frame_a_t_frame_b

  • frame_b – Tip frame for transform. The transform returned will be frame_a_t_frame_b

  • transform_time – The time at which to look up the transform. If left at None, the most

  • used. (recent transform available will)

  • timeout_sec – The time to wait for the transform to become available if the requested time is beyond

  • 0 (the most recent transform in the buffer. If set to)

  • None (it will not wait. If left at)

  • will (it)

  • indefinitely. (wait)

Returns:

Whether a transform between frame_a and frame_b is available at the specified time.

wait_for_a_tform_b_async(frame_a: str, frame_b: str, transform_time: StampLike | TimeLike | None = None) rclpy.task.Future[source]

Wait asynchronously for the transform from from_frame to to_frame to become available.

Parameters:
  • frame_a – Base frame for transform. The transform returned will be frame_a_t_frame_b

  • frame_b – Tip frame for transform. The transform returned will be frame_a_t_frame_b

  • transform_time – The time at which to look up the transform. If left at None, the most

  • used. (recent transform available will)

Returns:

A future that will tell if a transform between frame_a and frame_b is available at the time specified.

class synchros2.tf_listener_wrapper.TimeLike[source]

Bases: Protocol

A protocol similar to to rclpy.time.Time

__init__(*args, **kwargs)
nanoseconds: int
synchros2.tf_listener_wrapper.from_time_like(obj: StampLike | TimeLike) rclpy.time.Time[source]

Convert an object that exhibits a Time-like interface to a proper Time instance.