Source code for tf2_ros.buffer_interface
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# author: Wim Meeussen
from typing import Optional
from typing import TypeVar
from typing import Union
from typing import Callable
from typing import Tuple
from typing import Any
import rclpy
import tf2_py as tf2
import tf2_ros
from copy import deepcopy
from std_msgs.msg import Header
from geometry_msgs.msg import TransformStamped
from geometry_msgs.msg import PointStamped
from geometry_msgs.msg import PoseStamped
from geometry_msgs.msg import PoseWithCovarianceStamped
from geometry_msgs.msg import Vector3Stamped
from sensor_msgs.msg import PointCloud2
from rclpy.time import Time
from rclpy.duration import Duration
MsgStamped = Union[
PointStamped,
PoseStamped,
PoseWithCovarianceStamped,
Vector3Stamped,
PointCloud2
]
PyKDLType = TypeVar("PyKDLType")
TransformableObject = Union[MsgStamped, PyKDLType]
TransformableObjectType = TypeVar("TransformableObjectType")
[docs]
class BufferInterface:
"""
Abstract interface for wrapping the Python bindings for the tf2 library in
a ROS-based convenience API.
Implementations include :class:tf2_ros.buffer.Buffer and
:class:tf2_ros.buffer_client.BufferClient.
"""
# transform, simple api
[docs]
def transform(
self,
object_stamped: TransformableObject,
target_frame: str,
timeout: Duration = Duration(),
new_type: Optional[TransformableObjectType] = None
) -> TransformableObject:
"""
Transform an input into the target frame.
The input must be a known transformable type (by way of the tf2 data type conversion interface).
If new_type is not None, the type specified must have a valid conversion from the input type,
else the function will raise an exception.
:param object_stamped: The timestamped object to transform.
:param target_frame: Name of the frame to transform the input into.
:param timeout: Time to wait for the target frame to become available.
:param new_type: Type to convert the object to.
:return: The transformed, timestamped output, possibly converted to a new type.
"""
do_transform = self.registration.get(type(object_stamped))
res = do_transform(object_stamped, self.lookup_transform(target_frame, object_stamped.header.frame_id,
object_stamped.header.stamp, timeout))
if new_type == None:
return res
return convert(res, new_type)
# transform, advanced api
[docs]
def transform_full(
self,
object_stamped: TransformableObject,
target_frame: str,
target_time: Time,
fixed_frame: str,
timeout: Duration = Duration(),
new_type: Optional[TransformableObjectType] = None
) -> TransformableObject:
"""
Transform an input into the target frame (advanced API).
The input must be a known transformable type (by way of the tf2 data type conversion interface).
If new_type is not None, the type specified must have a valid conversion from the input type,
else the function will raise an exception.
This function follows the advanced API, which allows tranforming between different time points,
as well as specifying a frame to be considered fixed in time.
:param object_stamped: The timestamped object to transform.
:param target_frame: Name of the frame to transform the input into.
:param target_time: Time to transform the input into.
:param fixed_frame: Name of the frame to consider constant in time.
:param timeout: Time to wait for the target frame to become available.
:param new_type: Type to convert the object to.
:return: The transformed, timestamped output, possibly converted to a new type.
"""
do_transform = self.registration.get(type(object_stamped))
res = do_transform(object_stamped, self.lookup_transform_full(target_frame, target_time,
object_stamped.header.frame_id, object_stamped.header.stamp,
fixed_frame, timeout))
if new_type == None:
return res
return convert(res, new_type)
[docs]
def lookup_transform(
self,
target_frame: str,
source_frame: str,
time: Time,
timeout: Duration = Duration()
) -> TransformStamped:
"""
Get the transform from the source frame to the target frame.
Must be implemented by a subclass of BufferInterface.
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
:param time: The time at which to get the transform (0 will get the latest).
:param timeout: Time to wait for the target frame to become available.
:return: The transform between the frames.
"""
raise NotImplementedException()
[docs]
def lookup_transform_full(
self,
target_frame: str,
target_time: Time,
source_frame: str,
source_time: Time,
fixed_frame: str,
timeout: Duration = Duration()
) -> TransformStamped:
"""
Get the transform from the source frame to the target frame using the advanced API.
Must be implemented by a subclass of BufferInterface.
:param target_frame: Name of the frame to transform into.
:param target_time: The time to transform to (0 will get the latest).
:param source_frame: Name of the input frame.
:param source_time: The time at which source_frame will be evaluated (0 will get the latest).
:param fixed_frame: Name of the frame to consider constant in time.
:param timeout: Time to wait for the target frame to become available.
:return: The transform between the frames.
"""
raise NotImplementedException()
# can, simple api
[docs]
def can_transform(
self,
target_frame: str,
source_frame: str,
time: Time,
timeout: Duration = Duration()
) -> bool:
"""
Check if a transform from the source frame to the target frame is possible.
Must be implemented by a subclass of BufferInterface.
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
:param time: The time at which to get the transform (0 will get the latest).
:param timeout: Time to wait for the target frame to become available.
:return: True if the transform is possible, false otherwise.
"""
raise NotImplementedException()
# can, advanced api
[docs]
def can_transform_full(
self,
target_frame: str,
target_time: Time,
source_frame: str,
source_time: Time,
fixed_frame: str,
timeout: Duration = Duration()
) -> bool:
"""
Check if a transform from the source frame to the target frame is possible (advanced API).
Must be implemented by a subclass of BufferInterface.
:param target_frame: Name of the frame to transform into.
:param target_time: The time to transform to (0 will get the latest).
:param source_frame: Name of the input frame.
:param source_time: The time at which source_frame will be evaluated (0 will get the latest).
:param fixed_frame: Name of the frame to consider constant in time.
:param timeout: Time to wait for the target frame to become available.
:return: True if the transform is possible, false otherwise.
"""
raise NotImplementedException()
[docs]
def Stamped(
obj: TransformableObject,
stamp: Time,
frame_id: str
) -> TransformableObject:
obj.header = Header(frame_id=frame_id, stamp=stamp)
return obj
[docs]
class TypeException(Exception):
"""
Raised when an unexpected type is received while registering a transform
in :class:`tf2_ros.buffer_interface.BufferInterface`.
"""
[docs]
class NotImplementedException(Exception):
"""
Raised when can_transform or lookup_transform is not implemented in a
subclass of :class:`tf2_ros.buffer_interface.BufferInterface`.
"""
[docs]
class TransformRegistration():
__type_map = {}
[docs]
def add(
self,
key: TransformableObjectType,
callback: Callable[[TransformableObject, TransformStamped], TransformableObject]
) -> None:
TransformRegistration.__type_map[key] = callback
[docs]
def get(
self,
key: TransformableObjectType
) -> Callable[[TransformableObject, TransformStamped], TransformableObject]:
if not key in TransformRegistration.__type_map:
raise TypeException('Type %s is not loaded or supported' % str(key))
else:
return TransformRegistration.__type_map[key]
[docs]
class ConvertRegistration():
__to_msg_map = {}
__from_msg_map = {}
__convert_map = {}
[docs]
def add_from_msg(
self,
key: TransformableObjectType,
callback: Callable[[MsgStamped], TransformableObject]
) -> None:
ConvertRegistration.__from_msg_map[key] = callback
[docs]
def add_to_msg(
self,
key: TransformableObjectType,
callback: Callable[[TransformableObject], MsgStamped]
) -> None:
ConvertRegistration.__to_msg_map[key] = callback
[docs]
def add_convert(
self,
key: Tuple[TransformableObjectType, TransformableObjectType],
callback: Callable[[Any], TransformableObject]
) -> None:
ConvertRegistration.__convert_map[key] = callback
[docs]
def get_from_msg(
self,
key: TransformableObjectType
) -> Callable[[MsgStamped], TransformableObject]:
if not key in ConvertRegistration.__from_msg_map:
raise TypeException('Type %s is not loaded or supported' % str(key))
else:
return ConvertRegistration.__from_msg_map[key]
[docs]
def get_to_msg(
self,
key: TransformableObjectType
) -> Callable[[TransformableObject], MsgStamped]:
if not key in ConvertRegistration.__to_msg_map:
raise TypeException('Type %s is not loaded or supported' % str(key))
else:
return ConvertRegistration.__to_msg_map[key]
[docs]
def get_convert(
self,
key: Tuple[TransformableObjectType, TransformableObjectType]
) -> Callable[[Any], TransformableObject]:
if not key in ConvertRegistration.__convert_map:
raise TypeException("Type %s is not loaded or supported" % str(key))
else:
return ConvertRegistration.__convert_map[key]
[docs]
def convert(a: TransformableObject, b_type: TransformableObjectType) -> TransformableObject:
c = ConvertRegistration()
#check if an efficient conversion function between the types exists
try:
f = c.get_convert((type(a), b_type))
return f(a)
except TypeException:
if type(a) == b_type:
return deepcopy(a)
f_to = c.get_to_msg(type(a))
f_from = c.get_from_msg(b_type)
return f_from(f_to(a))