Source code for tf2_ros.buffer
# Copyright (c) 2008 Willow Garage, Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# author: Wim Meeussen
import threading
from time import sleep
from typing import Callable
from typing import List
from typing import Optional
from typing import TypeVar
from geometry_msgs.msg import TransformStamped
import rclpy
from rclpy.clock import JumpThreshold, TimeJump
from rclpy.duration import Duration
from rclpy.node import Node
from rclpy.task import Future
from rclpy.time import Time
from tf2_msgs.srv import FrameGraph
import tf2_py as tf2
from .buffer_interface import BufferInterface
FrameGraphSrvRequest = TypeVar('FrameGraphSrvRequest')
FrameGraphSrvResponse = TypeVar('FrameGraphSrvResponse')
[docs]
class Buffer(tf2.BufferCore, BufferInterface):
"""
Standard implementation of the :class:`tf2_ros.BufferInterface` abstract data type.
Inherits from :class:`tf2_ros.buffer_interface.BufferInterface` and :class:`tf2.BufferCore`.
Stores known frames and offers a ROS service, "tf_frames", which responds to client requests
with a response containing a :class:`tf2_msgs.FrameGraph` representing the relationship of
known frames.
"""
[docs]
def __init__(
self,
cache_time: Optional[Duration] = None,
node: Optional[Node] = None
) -> None:
"""
Construct a Buffer.
:param cache_time: Duration describing how long to retain past information in BufferCore.
:param node: Create a tf2_frames service that returns all frames as a yaml document.
"""
if cache_time is not None:
tf2.BufferCore.__init__(self, cache_time)
else:
tf2.BufferCore.__init__(self)
BufferInterface.__init__(self)
self._new_data_callbacks: List[Callable[[], None]] = []
self._callbacks_to_remove: List[Callable[[], None]] = []
self._callbacks_lock = threading.RLock()
if node is not None:
self.srv = node.create_service(FrameGraph, 'tf2_frames', self.__get_frames)
self.clock = node.get_clock()
else:
self.clock = rclpy.clock.Clock()
# create a jump callback to clear the buffer if use_sim_true is true and there is a
# jump in time
threshold = JumpThreshold(min_forward=None,
min_backward=Duration(seconds=-1),
on_clock_change=True)
self.jump_handle = self.clock.create_jump_callback(
threshold, post_callback=self.time_jump_callback)
def __get_frames(
self,
req: FrameGraphSrvRequest,
res: FrameGraphSrvResponse
) -> FrameGraphSrvResponse:
return FrameGraph.Response(frame_yaml=self.all_frames_as_yaml())
[docs]
def time_jump_callback(self, time_jump: TimeJump):
rclpy.logging.get_logger('tf2_buffer').warning(
'Detected jump back in time. Clearing tf buffer.')
self.clear()
[docs]
def set_transform(self, transform: TransformStamped, authority: str) -> None:
super().set_transform(transform, authority)
self._call_new_data_callbacks()
[docs]
def set_transform_static(self, transform: TransformStamped, authority: str) -> None:
super().set_transform_static(transform, authority)
self._call_new_data_callbacks()
def _call_new_data_callbacks(self) -> None:
with self._callbacks_lock:
for callback in self._new_data_callbacks:
callback()
# Remove callbacks after to avoid modifying list being iterated on
for callback in self._callbacks_to_remove:
self._new_data_callbacks.remove(callback)
self._callbacks_to_remove.clear()
def _remove_callback(self, callback: Callable[[], None]) -> None:
with self._callbacks_lock:
# Actually remove the callback later
self._callbacks_to_remove.append(callback)
[docs]
def lookup_transform(
self,
target_frame: str,
source_frame: str,
time: Time,
timeout: Duration = Duration()
) -> TransformStamped:
"""
Get the transform from the source frame to the target frame.
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
:param time: The time at which to get the transform (0 will get the latest).
:param timeout: Time to wait for the target frame to become available.
:return: The transform between the frames.
"""
self.can_transform(target_frame, source_frame, time, timeout)
return self.lookup_transform_core(target_frame, source_frame, time)
[docs]
async def lookup_transform_async(
self,
target_frame: str,
source_frame: str,
time: Time
) -> TransformStamped:
"""
Get the transform from the source frame to the target frame asyncronously.
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
:param time: The time at which to get the transform (0 will get the latest).
:return: The transform between the frames.
"""
await self.wait_for_transform_async(target_frame, source_frame, time)
return self.lookup_transform_core(target_frame, source_frame, time)
[docs]
def lookup_transform_full(
self,
target_frame: str,
target_time: Time,
source_frame: str,
source_time: Time,
fixed_frame: str,
timeout: Duration = Duration()
) -> TransformStamped:
"""
Get the transform from the source frame to the target frame using the advanced API.
:param target_frame: Name of the frame to transform into.
:param target_time: The time to transform to (0 will get the latest).
:param source_frame: Name of the input frame.
:param source_time: The time at which source_frame will be evaluated (0 gets the latest).
:param fixed_frame: Name of the frame to consider constant in time.
:param timeout: Time to wait for the target frame to become available.
:return: The transform between the frames.
"""
self.can_transform_full(
target_frame, target_time, source_frame, source_time, fixed_frame, timeout)
return self.lookup_transform_full_core(
target_frame, target_time, source_frame, source_time, fixed_frame)
[docs]
async def lookup_transform_full_async(
self,
target_frame: str,
target_time: Time,
source_frame: str,
source_time: Time,
fixed_frame: str
) -> TransformStamped:
"""
Get transform from source frame to target frame using the advanced API asyncronously.
:param target_frame: Name of the frame to transform into.
:param target_time: The time to transform to (0 will get the latest).
:param source_frame: Name of the input frame.
:param source_time: The time at which source_frame will be evaluated (0 gets the latest).
:param fixed_frame: Name of the frame to consider constant in time.
:return: The transform between the frames.
"""
await self.wait_for_transform_full_async(
target_frame, target_time, source_frame, source_time, fixed_frame)
return self.lookup_transform_full_core(
target_frame, target_time, source_frame, source_time, fixed_frame)
[docs]
def can_transform(
self,
target_frame: str,
source_frame: str,
time: Time,
timeout: Duration = Duration(),
return_debug_tuple: bool = False
) -> bool:
"""
Check if a transform from the source frame to the target frame is possible.
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
:param time: The time at which to get the transform (0 will get the latest).
:param timeout: Time to wait for the target frame to become available.
:param return_debug_type: If true, return a tuple representing debug information.
:return: The information of the transform being waited on.
"""
clock = rclpy.clock.Clock()
if timeout != Duration():
start_time = clock.now()
while (clock.now() < start_time + timeout and
not self.can_transform_core(target_frame, source_frame, time)[0] and
(clock.now() + Duration(seconds=3.0)) >= start_time):
# big jumps in time are likely bag loops, so break for them
# TODO(Anyone): We can't use Rate here because it would never expire
# with a single-threaded executor.
# See https://github.com/ros2/geometry2/issues/327 for ideas on
# how to timeout waiting for transforms that don't block the executor.
sleep(0.02)
core_result = self.can_transform_core(target_frame, source_frame, time)
if return_debug_tuple:
return core_result
return core_result[0]
[docs]
def can_transform_full(
self,
target_frame: str,
target_time: Time,
source_frame: str,
source_time: Time,
fixed_frame: str,
timeout: Duration = Duration(),
return_debug_tuple: bool = False
) -> bool:
"""
Check if a transform from the source frame to the target frame is possible (advanced API).
Must be implemented by a subclass of BufferInterface.
:param target_frame: Name of the frame to transform into.
:param target_time: The time to transform to (0 will get the latest).
:param source_frame: Name of the input frame.
:param source_time: The time at which source_frame will be evaluated (0 gets the latest).
:param fixed_frame: Name of the frame to consider constant in time.
:param timeout: Time to wait for the target frame to become available.
:param return_debug_type: If true, return a tuple representing debug information.
:return: The information of the transform being waited on.
"""
clock = rclpy.clock.Clock()
if timeout != Duration():
start_time = clock.now()
while (clock.now() < start_time + timeout and
not self.can_transform_full_core(
target_frame, target_time, source_frame, source_time, fixed_frame)[0] and
(clock.now() + Duration(seconds=3.0)) >= start_time):
# big jumps in time are likely bag loops, so break for them
# TODO(Anyone): We can't use Rate here because it would never expire
# with a single-threaded executor.
# See https://github.com/ros2/geometry2/issues/327 for ideas on
# how to timeout waiting for transforms that don't block the executor.
sleep(0.02)
core_result = self.can_transform_full_core(
target_frame, target_time, source_frame, source_time, fixed_frame)
if return_debug_tuple:
return core_result
return core_result[0]
[docs]
def wait_for_transform_async(
self,
target_frame: str,
source_frame: str,
time: Time
) -> Future:
"""
Wait for a transform from the source frame to the target frame to become possible.
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
:param time: The time at which to get the transform (0 will get the latest).
:return: A future that becomes true when the transform is available.
"""
fut = rclpy.task.Future()
if self.can_transform_core(target_frame, source_frame, time)[0]:
# Short cut, the transform is available
fut.set_result(self.lookup_transform(target_frame, source_frame, time))
return fut
def _on_new_data():
try:
if self.can_transform_core(target_frame, source_frame, time)[0]:
fut.set_result(self.lookup_transform(target_frame, source_frame, time))
except BaseException as e:
fut.set_exception(e)
self._new_data_callbacks.append(_on_new_data)
fut.add_done_callback(lambda _: self._remove_callback(_on_new_data))
return fut
[docs]
def wait_for_transform_full_async(
self,
target_frame: str,
target_time: Time,
source_frame: str,
source_time: Time,
fixed_frame: str
) -> Future:
"""
Wait for a transform from the source frame to the target frame to become possible.
:param target_frame: Name of the frame to transform into.
:param target_time: The time to transform to (0 will get the latest).
:param source_frame: Name of the input frame.
:param source_time: The time at which source_frame will be evaluated (0 gets the latest).
:param fixed_frame: Name of the frame to consider constant in time.
:return: A future that becomes true when the transform is available.
"""
fut = rclpy.task.Future()
if self.can_transform_full_core(
target_frame, target_time, source_frame, source_time, fixed_frame)[0]:
# Short cut, the transform is available
fut.set_result(
self.lookup_transform_full_core(
target_frame, target_time, source_frame, source_time, fixed_frame))
return fut
def _on_new_data():
try:
if self.can_transform_full_core(
target_frame, target_time, source_frame, source_time, fixed_frame)[0]:
fut.set_result(
self.lookup_transform_full_core(
target_frame, target_time, source_frame, source_time, fixed_frame))
except BaseException as e:
fut.set_exception(e)
self._new_data_callbacks.append(_on_new_data)
fut.add_done_callback(lambda _: self._remove_callback(_on_new_data))
return fut