cras package

Submodules

cras.ctypes_utils module

Utilities for working with the ctypes library.

class cras.ctypes_utils.Allocator

Bases: object

An allocator that can be passed as a callback to a C function that should return dynamically allocated data.

This way, Python can still manage the lifetime of the buffer, and it is not needed to know the size of the buffer prior to calling the C function.

Variables:
  • allocated (List[c_void_p]) – The allocated buffers.
  • allocated_sizes (List[int]) – Sizes of the allocated buffer in bytes.
ALLOCATOR

alias of ctypes.CFUNCTYPE.<locals>.CFunctionType

get_cfunc()

Get the ctypes object referring to this allocator.

Returns:The ctypes-compatible allocator object (that should be passed as function argument).
Return type:Allocator.ALLOCATOR
value

Get the first allocated value converted to a Python object.

Returns:The allocated value.
values

Get the allocated values converted to Python objects.

Returns:The allocated values.
class cras.ctypes_utils.BytesAllocator

Bases: cras.ctypes_utils.Allocator

ctypes allocator suitable for allocating byte arrays. The returned value is a bytes object.

value

Get the first allocated value converted to a Python object.

Returns:The allocated value.
values

Get the allocated values converted to Python objects.

Returns:The allocated values.
class cras.ctypes_utils.LogMessagesAllocator

Bases: cras.ctypes_utils.RosMessagesAllocator

ctypes allocator suitable for allocating byte buffers for serialized rosgraph_msgs/Log messages and printing them via standard rospy logging mechanisms.

format_log_message(msg)

Convert the log message to a string to be logged.

Parameters:msg (rosgraph_msgs/Log) – The log message.
Returns:The text to be logged.
Return type:str
get_formatted_log_messages()

Get all messages allocated by this allocator converted to strings to be printed.

Returns:Tuples of (loglevel, message string).
Return type:collections.Iterable[Tuple[int, str]]
print_log_messages()

Print all messages allocated by this allocator using the standard rospy logger.

cras.ctypes_utils.RTLD_LAZY = 1

Relocate dynamic symbols only when they are used.

cras.ctypes_utils.RTLD_NOW = 2

Relocate dynamic symbols as soon as the library is loaded.

class cras.ctypes_utils.RosMessagesAllocator(msg_type)

Bases: cras.ctypes_utils.BytesAllocator

ctypes allocator suitable for allocating byte buffers for serialized ROS messages.

Create the message allocator.

Parameters:msg_type (Type) – Type of the messages.
message

Return the first ROS message allocated by this allocator.

Returns:The log message.
Return type:genpy.Message
messages

Return the ROS messages allocated by this allocator.

Returns:The log messages.
Return type:collections.Iterable[genpy.Message]
class cras.ctypes_utils.ScalarAllocator(c_type)

Bases: cras.ctypes_utils.Allocator

ctypes allocator suitable for allocating scalar values (e.g. numbers). The returned value is a scalar value.

This means the size of each allocation needs to exactly match the number of bytes the given c_type has. If other sizes are requested, RuntimeError is thrown.

value

Get the first allocated value converted to a Python object.

Returns:The allocated value.
values

Get the allocated values converted to Python objects.

Returns:The allocated values.
class cras.ctypes_utils.StringAllocator

Bases: cras.ctypes_utils.Allocator

ctypes allocator suitable for allocating strings. The returned value is of str type.

value

Get the first allocated value converted to a Python object.

Returns:The allocated value.
values

Get the allocated values converted to Python objects.

Returns:The allocated values.
cras.ctypes_utils.c_array(data, c_type)

Convert the given Python iterable to a C array with zero element after the last one.

Parameters:
  • data (list) – The data to convert.
  • c_type (type) – The ctypes type of the individual elements.
Returns:

The ctypes array (it is one element longer than the input).

cras.ctypes_utils.get_libc()
cras.ctypes_utils.get_ro_c_buffer(buf, buf_len=None)

Return a read-only ctypes byte buffer object that points to the given buffer’s memory or its copy.

Parameters:
  • buf ([list|tuple|BufferStringIO|bytes]) – The source buffer.
  • buf_len (int) – Length of the buffer (if None, len(buf) is used if it exists, or stream length).
Returns:

The ctypes c_uint8 array.

cras.ctypes_utils.libc_getenv(name, default_value=None)

Call libc function getenv() to get a C program environment variable value.

Parameters:
  • name (str) – Name of the variable.
  • default_value (str, optional) – Default value to return if the variable is not found.
Returns:

The variable (as Python string) or the default value if the variable is not set.

Return type:

str, optional

cras.ctypes_utils.libc_setenv(name, value, overwrite=True)

Call libc function setenv() to set the C program environment.

Parameters:
  • name (str) – Name of the variable.
  • value (str) – Value of the variable.
  • overwrite (bool) – Whether to overwrite an existing variable or return True without overwriting it.
Returns:

True on success.

Return type:

bool

Raises:

OSError – If there is not enough memory or name contains ‘=’.

cras.ctypes_utils.libc_unsetenv(name)

Call libc function unsetenv() to unset the C program environment.

Parameters:name (str) – Name of the variable.
Returns:True on success.
Return type:bool
Raises:OSError – If name contains ‘=’.
cras.ctypes_utils.load_library(library_name, mode=1, use_errno=False)

Load a C library (.so file) located on LD_LIBRARY_PATH.

All dependencies of this library have to be loaded before this one with mode = ctypes.RTLD_GLOBAL.

Parameters:
  • library_name (str) – Name of the library (without the “lib” prefix and “.so” suffix).
  • mode (int) – Symbol loading scope and when they should be loaded. Either ctypes.RTLD_LOCAL, ctypes.RTLD_GLOBAL, RTLD_NOW or RTLD_LAZY.
  • use_errno (bool) – Whether called functions should use a ctypes’ thread-local errno copy.
Returns:

A handle to the loaded library.

Return type:

CDLL

cras.geometry_utils module

Geometry utilities.

Conversions quaternion<->roll/pitch/yaw are consistent with those in cras_cpp_common tf2_utils.

cras.geometry_utils.quat_get_pitch(quaternion, *args)

Get the given quaternion’s pitch in radians.

Parameters:
  • quaternion (Quaternion|List[float]|Tuple[float]) – The quaternion to convert. If a 4-tuple is passed, make sure w is the last component, not the first!
  • args (Tuple[Any]) – You can also call this function with 4 arguments instead of passing a tuple.
Returns:

Pitch in radians.

Return type:

float

Raises:

ValueError – If invalid number of arguments is passed or they have invalid number of elements.

cras.geometry_utils.quat_get_roll(quaternion, *args)

Get the given quaternion’s roll in radians.

Parameters:
  • quaternion (Quaternion|List[float]|Tuple[float]) – The quaternion to convert. If a 4-tuple is passed, make sure w is the last component, not the first!
  • args (Tuple[Any]) – You can also call this function with 4 arguments instead of passing a tuple.
Returns:

Roll in radians.

Return type:

float

Raises:

ValueError – If invalid number of arguments is passed or they have invalid number of elements.

cras.geometry_utils.quat_get_rpy(quaternion, *args)

Convert the given quaternion to roll, pitch and yaw.

Parameters:
  • quaternion (Quaternion|List[float]|Tuple[float]) – The quaternion to convert. If a 4-tuple is passed, make sure w is the last component, not the first!
  • args (Tuple[Any]) – You can also call this function with 4 arguments instead of passing a tuple.
Returns:

Tuple (roll, pitch, yaw) in radians.

Return type:

Tuple[float]

Raises:

ValueError – If invalid number of arguments is passed or they have invalid number of elements.

cras.geometry_utils.quat_get_yaw(quaternion, *args)

Get the given quaternion’s yaw in radians.

Parameters:
  • quaternion (Quaternion|List[float]|Tuple[float]) – The quaternion to convert. If a 4-tuple is passed, make sure w is the last component, not the first!
  • args (Tuple[Any]) – You can also call this function with 4 arguments instead of passing a tuple.
Returns:

Yaw in radians.

Return type:

float

Raises:

ValueError – If invalid number of arguments is passed or they have invalid number of elements.

cras.geometry_utils.quat_msg_from_rpy(roll, pitch=None, yaw=None)

Convert the given roll, pitch and yaw to geometry_msgs/Quaternion.

Parameters:
  • roll (float|Tuple[float]|List[float]) – Roll in radians (or a 3-tuple (roll, pitch, yaw)).
  • pitch (float|None) – Pitch in radians (do not use if you pass 3-tuple in the first argument).
  • yaw (float|None) – Yaw in radians (do not use if you pass 3-tuple in the first argument).
Returns:

The corresponding quaternion.

Return type:

Quaternion

cras.geometry_utils.quat_tuple_from_rpy(roll, pitch=None, yaw=None)

Convert the given roll, pitch and yaw to quaternion represented as 4-tuple of floats (w last).

Parameters:
  • roll (float|Tuple[float]|List[float]) – Roll in radians (or a 3-tuple (roll, pitch, yaw)).
  • pitch (float|None) – Pitch in radians (do not use if you pass 3-tuple in the first argument).
  • yaw (float|None) – Yaw in radians (do not use if you pass 3-tuple in the first argument).
Returns:

The corresponding quaternion.

Return type:

Tuple[float]

Raises:

ValueError – If invalid number of arguments is passed or they have invalid number of elements.

cras.log_utils module

Utilities for working with rospy logging.

cras.log_utils.log(level, message, *args, **kwargs)

Log a ROS message with a logger that has the given severity level.

Parameters:
  • level (int) – Severity of the message (one of rosgraph_msgs/Log constants).
  • message (str) – The logged message template (same rules as in the logging module apply to args/kwargs).
  • args – Args are passed directly to rospy.loginfo() or the other functions.
  • kwargs – Keyword args are passed directly to rospy.loginfo() or the other functions.
Raises:

KeyError – if level is not one of the rosgraph_msgs/Log constants.

cras.log_utils.log_functions = {1: <function logdebug>, 2: <function loginfo>, 4: <function logwarn>, 8: <function logerr>, 16: <function logfatal>}

The rospy logging functions accessible by their ROS logging level index.

cras.log_utils.log_level_names = {1: 'DEBUG', 2: 'INFO', 4: 'WARN', 8: 'ERROR', 16: 'FATAL'}

Stringified names of the ROS logging levels.

cras.log_utils.log_level_py_name_to_ros = {'CRITICAL': 16, 'DEBUG': 1, 'ERROR': 8, 'INFO': 2, 'WARNING': 4}

Convert python logging module level string to ROS log level int.

cras.log_utils.log_level_py_to_ros = {10: 1, 20: 2, 30: 4, 40: 8, 50: 16}

Convert python logging module level int to ROS log level int.

cras.log_utils.log_level_ros_to_py = {1: 10, 2: 20, 4: 30, 8: 40, 16: 50}

Convert ROS log level int to python logging module level int.

cras.log_utils.log_level_ros_to_py_name = {1: 'DEBUG', 2: 'INFO', 4: 'WARNING', 8: 'ERROR', 16: 'CRITICAL'}

Convert ROS log level int to python logging module level string.

cras.log_utils.log_levels = {1, 2, 4, 8, 16}

All available log levels.

cras.log_utils.log_once(level, message, *args, **kwargs)

Log a ROS message with a once-only logger that has the given severity level.

Parameters:
  • level (int) – Severity of the message (one of rosgraph_msgs/Log constants).
  • message (str) – The logged message template (same rules as in the logging module apply to args/kwargs).
  • args – Args are passed directly to rospy.loginfo_once() or the other functions.
  • kwargs – Keyword args are passed directly to rospy.loginfo_once() or the other functions.
Raises:

KeyError – if level is not one of the rosgraph_msgs/Log constants.

cras.log_utils.log_once_functions = {1: <function logdebug_once>, 2: <function loginfo_once>, 4: <function logwarn_once>, 8: <function logerr_once>, 16: <function logfatal_once>}

The rospy once-only logging functions accessible by their ROS logging level index.

cras.log_utils.log_once_identical(level, message, *args, **kwargs)

Log a ROS message with a once-only-identical logger that has the given severity level.

Parameters:
  • level (int) – Severity of the message (one of rosgraph_msgs/Log constants).
  • message (str) – The logged message template (same rules as in the logging module apply to args/kwargs).
  • args – Args are passed directly to rospy.loginfo() or the other functions.
  • kwargs – Keyword args are passed directly to rospy.loginfo() or the other functions.
Raises:

KeyError – if level is not one of the rosgraph_msgs/Log constants.

cras.log_utils.log_once_identical_functions = {1: <function logdebug_once_identical>, 2: <function loginfo_once_identical>, 4: <function logwarn_once_identical>, 8: <function logerr_once_identical>, 16: <function logfatal_once_identical>}

The once-only-identical logging functions accessible by their ROS logging level index.

cras.log_utils.log_throttle(level, period, message, *args, **kwargs)

Log a ROS message with a identical-throttled logger that has the given severity level.

Parameters:
  • level (int) – Severity of the message (one of rosgraph_msgs/Log constants).
  • period (float) – Logging period (in seconds).
  • message (str) – The logged message template (same rules as in the logging module apply to args/kwargs).
  • args – Args are passed directly to rospy.loginfo_throttle() or the other functions.
  • kwargs – Keyword args are passed directly to rospy.loginfo_throttle() or the other functions.
Raises:

KeyError – if level is not one of the rosgraph_msgs/Log constants.

cras.log_utils.log_throttle_functions = {1: <function logdebug_throttle>, 2: <function loginfo_throttle>, 4: <function logwarn_throttle>, 8: <function logerr_throttle>, 16: <function logfatal_throttle>}

The rospy throttled logging functions accessible by their ROS logging level index.

cras.log_utils.log_throttle_identical(level, period, message, *args, **kwargs)

Log a ROS message with a identical-throttled logger that has the given severity level.

Parameters:
  • level (int) – Severity of the message (one of rosgraph_msgs/Log constants).
  • period (float) – Logging period (in seconds).
  • message (str) – The logged message template (same rules as in the logging module apply to args/kwargs).
  • args – Args are passed directly to rospy.loginfo_throttle_identical() or the other functions.
  • kwargs – Keyword args are passed directly to rospy.loginfo_throttle_identical() or other functions.
Raises:

KeyError – if level is not one of the rosgraph_msgs/Log constants.

cras.log_utils.log_throttle_identical_functions = {1: <function logdebug_throttle_identical>, 2: <function loginfo_throttle_identical>, 4: <function logwarn_throttle_identical>, 8: <function logerr_throttle_identical>, 16: <function logfatal_throttle_identical>}

The rospy identical-throttled logging functions accessible by their ROS logging level index.

cras.log_utils.logdebug_once_identical(message, *args, **kwargs)

Log a debug ROS message with a once-only-identical logger.

Parameters:
  • message (str) – The logged message template (same rules as in the logging module apply to args/kwargs).
  • args – Args are passed directly to rospy.logdebug().
  • kwargs – Keyword args are passed directly to rospy.logdebug().
cras.log_utils.logerr_once_identical(message, *args, **kwargs)

Log an error ROS message with a once-only-identical logger.

Parameters:
  • message (str) – The logged message template (same rules as in the logging module apply to args/kwargs).
  • args – Args are passed directly to rospy.logerr().
  • kwargs – Keyword args are passed directly to rospy.logerr().
cras.log_utils.logfatal_once_identical(message, *args, **kwargs)

Log a fatal ROS message with a once-only-identical logger.

Parameters:
  • message (str) – The logged message template (same rules as in the logging module apply to args/kwargs).
  • args – Args are passed directly to rospy.logfatal().
  • kwargs – Keyword args are passed directly to rospy.logfatal().
cras.log_utils.loginfo_once_identical(message, *args, **kwargs)

Log an info ROS message with a once-only-identical logger.

Parameters:
  • message (str) – The logged message template (same rules as in the logging module apply to args/kwargs).
  • args – Args are passed directly to rospy.loginfo().
  • kwargs – Keyword args are passed directly to rospy.loginfo().
cras.log_utils.logwarn_once_identical(message, *args, **kwargs)

Log a warning ROS message with a once-only-identical logger.

Parameters:
  • message (str) – The logged message template (same rules as in the logging module apply to args/kwargs).
  • args – Args are passed directly to rospy.logwarn().
  • kwargs – Keyword args are passed directly to rospy.logwarn().

cras.message_utils module

Utilities helpful when working with ROS messages.

cras.message_utils.dict_to_dynamic_config_msg(d)

Convert configuration dict to dynamic_reconfigure/Config.

Parameters:d (dict or dynamic_reconfigure/Config or None) – Configuration dict (or already the message, in which case it is just returned).
Returns:The config message.
Return type:dynamic_reconfigure/Config
cras.message_utils.get_cfg_module(cfg_type_str)

Load and return a Python module corresponding to the given ROS dynamic reconfigure type.

Parameters:cfg_type_str (str) – Type of the config in textual form (e.g. compressed_image_transport/CompressedPublisher).
Returns:The Python module.
cras.message_utils.get_msg_field(msg, field, sep='/')

Parse a data field from a ROS message using a textual field address description.

An example could be calling get_msg_field(odom, 'pose/pose/position/x') on an Odometry message.

Parameters:
  • msg (genpy.Message) – Message.
  • field (str) – The field address, levels separated with sep or using array access [].
  • sep (str) – Separator used in the field address. Usually / or ..
Returns:

Value found in the given field in the message.

Return type:

Any

cras.message_utils.get_msg_type(msg_type_str)

Load and return a Python type corresponding to the given ROS message type.

Parameters:msg_type_str (str) – Type of the message in textual form (e.g. sensor_msgs/Imu).
Returns:The Python type object.
Return type:Type
cras.message_utils.get_srv_types(srv_type_str)

Load and return a Python types corresponding to the given ROS service type.

Parameters:srv_type_str (str) – Type of the service in textual form (e.g. std_srvs/SetBool).
Returns:A tuple of the main service Python type, the request type and the response type.
Return type:tuple

cras.node_utils module

Utilities helpful when writing nodes.

class cras.node_utils.Node

Bases: object

Base class for all nodes.

This class reads the following ROS parameters:

  • /jump_back_tolerance (float, default 3.0 in wall time and 0.0 in sim time):
    Threshold for ROS time jump back detection.
  • ~jump_back_tolerance (float, default from /jump_back_tolerance): Threshold for ROS time jump back detection.
  • ~reset_on_time_jump_back (bool, default True): Whether to call reset() when ROS time jumps back.
  • /jump_forward_tolerance (float, default 10.0 in sim time and max duration in wall time):
    Threshold for ROS time jump forward detection.
  • ~jump_forward_tolerance (float, default from /jump_forward_tolerance):
    Threshold for ROS time jump forward detection.
  • ~reset_on_time_jump_forward (bool, default True in sim time and False in wall time):
    Whether to call reset() when ROS time jumps forward.

This class subscribes the following topics: - /reset (any type): When a message is received on this topic, the node is reset. - ~reset (any type): When a message is received on this topic, the node is reset.

Variables:
  • _jump_back_tolerance (rospy.Duration) – Threshold for ROS time jump back detection.
  • _jump_forward_tolerance (rospy.Duration) – Threshold for ROS time jump forward detection.
  • _last_time_stamp (rospy.Time) – Last recorded time stamp. Used for time jump back detection.
  • _reset_on_time_jump_back (bool) – Whether to call reset() when ROS time jumps back.
  • _reset_on_time_jump_forward (bool) – Whether to call reset() when ROS time jumps forward.
  • _reset_sub (rospy.Subscriber) – Subscriber of the /reset topic.
  • _reset_priv_sub (rospy.Subscriber) – Subscriber of the ~reset topic.
check_time_jump(now=None)

Check if ROS time has not jumped back. If it did, call reset().

Note:This function should be called periodically. Call func:`start_auto_check_time_jump to spin a thread that will do these periodic calls automatically. Or you can call this function from your callbacks.
Parameters:now (rospy.Time|None) – The timestamp that should be interpreted as current time (if None, rospy.Time.now() is used).
reset()

Reset state of the class to the state where it was just after initialization.

Note:Do not forget to call super().reset() in subclasses!
start_auto_check_time_jump(hz=1.0)

Periodically check for time jumps (the checking period is in wall/steady time).

Parameters:hz (float) – The rate at which the checking should be done.
stop_auto_check_time_jump()

Stop periodic checking for time jumps.

cras.param_utils module

Utilities for working with ROS parameters.

Package cras.impl contains some predefined advanced parameter parsers. Other advanced parsers can be registered using register_param_conversion().

The predefined conversions can parse:

  • rospy.Duration from a float or from dict {‘sec’, ‘nsec’}
  • rospy.Time from a float or from dict {‘sec’, ‘nsec’}
  • geometry_msgs/Vector3 from a 3-tuple or from dict {‘x’, ‘y’, ‘z’}
  • geometry_msgs/Point from a 3-tuple or from dict {‘x’, ‘y’, ‘z’}
  • geometry_msgs/Point32 from a 3-tuple or from dict {‘x’, ‘y’, ‘z’}
  • geometry_msgs/Pose2D from a 3-tuple or from dict {‘x’, ‘y’, ‘theta’}
  • geometry_msgs/Quaternion from a 4-tuple (w last!) or from dict {‘x’, ‘y’, ‘z’, ‘w’}, or from
    {‘r’, ‘p’, ‘y’} or from {‘roll’, ‘pitch’, ‘yaw’}. Any missing dict key will be replaced by 0. Beware that if you specify only ‘y’ key, it is ambiguous. The quaternion is not explicitly normalized (when specified with 4 elements).
  • geometry_msgs/Pose from a 6-tuple (xyzrpy), 7-tuple (xyzxyzw), 16-tuple (row-major homogeneous transform
    matrix) or from dict {‘position’, ‘orientation’}, where ‘position’ is parsed as Point and ‘orientation’ as Quaternion (by any of their supported specifications).
exception cras.param_utils.GetParamException(info)

Bases: RuntimeError

Exception thrown when conversion of a parameter fails during get_param() if option throw_if_convert_fails is True or when a missing parameter is required.

Variables:info (GetParamResultInfo) – Details about get_param() execution until the failure.
class cras.param_utils.GetParamResult(value=None, info=None)

Bases: object

Wrapper for the result of a get_param_verbose() call. It contains the parameter value and additional information.

Variables:
class cras.param_utils.GetParamResultInfo(default_used=False, convert_failed=False, required_missing=False, message='', message_level=2)

Bases: object

Detailed information about the executed get_param_verbose() call.

Variables:
  • default_used (bool) – Whether the default value has been used.
  • convert_failed (bool) – Whether a value conversion failed.
  • required_missing (bool) – Whether a required parameter was found missing or could not be read.
  • message (str) – The log message (returned even if option print_messages is False).
  • message_level (uint) – Severity of the log message (one of rosgraph_msgs/Log constants).
cras.param_utils.get_param(param_name, default_value=None, unit='', print_default_as_warn=False, print_messages=True, throw_if_convert_fails=False, result_type=None)

Load a ROS parameter from param server, logging what we found to console.

Parameters:
  • param_name (str) – Name of the parameter.
  • default_value (Any) – Default value to be used it the parameter is not set. In None, the parameter is required.
  • unit (str) – Optional string serving as a [physical/SI] unit of the parameter, just to make the messages more informative. If empty, a default is looked up which can be registered using register_default_unit().
  • print_default_as_warn (bool) – Whether defaulted parameters are reported as warning or info level messages.
  • print_messages (bool) – Whether to print error messages to log.
  • throw_if_convert_fails (bool) – Throw GetParamException if any conversion fails. If False, the default value is used instead of a value that failed to convert. In such case, the log message is of error level.
  • result_type (Any) – Desired type of the result. If None, the same type as default_value is used. If default_value is also None, no type conversion is done.
Returns:

The parameter value.

Return type:

Any

Raises:

GetParamException – If the parameter is required and not set on param server, or if type conversion fails.

cras.param_utils.get_param_verbose(param_name, default_value=None, unit='', print_default_as_warn=False, print_messages=True, throw_if_convert_fails=False, result_type=None)

Load a ROS parameter from param server, possibly logging what we found to console.

Parameters:
  • param_name (str) – Name of the parameter.
  • default_value (Any) – Default value to be used it the parameter is not set. In None, the parameter is required.
  • unit (str) – Optional string serving as a [physical/SI] unit of the parameter, just to make the messages more informative. If empty, a default is looked up which can be registered using register_default_unit().
  • print_default_as_warn (bool) – Whether defaulted parameters are reported as warning or info level messages.
  • print_messages (bool) – Whether to print error messages to log.
  • throw_if_convert_fails (bool) – Throw GetParamException if any conversion fails. If False, the default value is used instead of a value that failed to convert. In such case, the log message is of error level.
  • result_type (Any) – Desired type of the result. If None, the same type as default_value is used. If default_value is also None, no type conversion is done.
Returns:

The parameter value wrapped in GetParamResult struct with additional details.

Return type:

GetParamResult

Raises:

GetParamException – If the parameter is required and not set on param server, or if type conversion fails.

cras.param_utils.register_default_unit(result_type, unit)

Register a default unit used when printing log messages.

Parameters:
  • result_type (Type) – Type of the parameter.
  • unit (str) – Unit to use for the given parameter type if none other is specified.
cras.param_utils.register_enum_conversion(enum_type)

Register a function converting ROS parameters of type str to the given enum (by taking names of the enum values and reading them from the string).

Parameters:enum_type (Type) – Type of the enum.
cras.param_utils.register_param_conversion(result_type, param_type, convert_fn)

Register a function converting ROS parameters of type param_type to result_type.

Parameters:
  • result_type (Type) – Type of the result value.
  • param_type (Type) – Type of the parameter value read from ROS parameter server.
  • convert_fn (Callable[[param_type],result_type]) – Function converting the ROS parameter to the result type. It should raise ValueError if the value is not suitable.

cras.python_utils module

General Python utilities.

cras.python_utils.temp_environ(unset=None, **update)

Context manager that temporarily updates the os.environ dictionary in-place.

The implementation is based on https://stackoverflow.com/a/34333710/1076564 (MIT license).

Parameters:
  • unset (list of str or None) – Environment variables to temporarily unset.
  • update (dict) – Dictionary of environment variables and values to temporarily set/update.
cras.python_utils.temp_locale(category, localename)

Context manager that temporarily changes locale.

Parameters:
  • category (int) – Locale category identifier, one of the LC_xxx constants.
  • localename (str) – System-specific locale identifier. Can be “” for the user-preferred locale or “C” for the minimal locale.

cras.static_transform_broadcaster module

class cras.static_transform_broadcaster.StaticTransformBroadcaster

Bases: tf2_ros.static_transform_broadcaster.StaticTransformBroadcaster

StaticTransformBroadcaster Allows broadcasting more than one static TF (accumulates all that have been sent to this class and publishes the aggregate message).

pub_tf = <rospy.topics.Publisher object>
sendTransform(transform)
tfs = {}

cras.string_utils module

Utilities helpful when working with strings in Python and rospy.

cras.string_utils.STRING_TYPE

alias of builtins.str

cras.string_utils.iconv_convert_bytes(to_encoding, from_encoding, in_bytes, translit=False, ignore=False, initial_outbuf_size_scale=1.0, outbuf_enlarge_coef=2.0, localename=None)

Convert in_bytes from from_encoding to to_encoding using iconv.

Parameters:
  • to_encoding (str) – The target encoding. It may contain the //TRANSLIT and //IGNORE suffixes.
  • from_encoding (str) – The source encoding.
  • in_bytes (bytes) – The bytes to convert (get them e.g. by calling "my_string".encode('utf-8')).
  • translit (bool) – If True, the conversion will try to transliterate letters not present in target encoding.
  • ignore (bool) – If True, letters that can’t be converted and transliterated will be left out.
  • initial_outbuf_size_scale (float) – The initial scale of the size of the output buffer. Setting this to the correct value may speed up the conversion in case the output is much larger than the input.
  • outbuf_enlarge_coef – The step size to use for enlarging the output buffer if it shows that its initial size is insufficient. Must be stritctly larger than 1.0.
  • localename (str, optional) – If set, specifies the locale used for the iconv call. It may influence the transliteration results. If not set, a default english locale is used that usually works quite well.
Returns:

The converted bytes. Call e.g. result.decode('utf-8') to get a string from it.

Return type:

bytes

Raises:

ValueError – If some characters cannot be converted and ignore is False, or if the encodings are unknown.

Validates that name is a legal base name for a graph resource. A base name has no namespace context, e.g. “node_name”.

This function works correctly, which can’t be said about the rosgraph.names version.

Parameters:name (str) – Name
Return type:bool

Check if name is a legal ROS name for graph resources.

This function works correctly, which can’t be said about the rosgraph.names version.

Parameters:name (str) – Name
Return type:bool
cras.string_utils.register_to_str(data_type, fn)

Register new conversion for to_str().

Parameters:
  • data_type (Type) – The type to register.
  • fn (Callable[[Any],str]) – The conversion function. It should return the string representation.
cras.string_utils.to_str(obj)

Convert the given object to a human-friendly string representation.

Parameters:obj (Any) – The object to convert.
Returns:The object’s string representation.
Return type:str
cras.string_utils.to_valid_ros_name(text, base_name=True, fallback_name=None)

Make sure the given string can be used as ROS name.

Parameters:
  • text (str) – The text to convert.
  • base_name (bool) – If True, the text represents only one “level” of names. If False, it can be the absolute or relative name with ~ and /.
  • fallback_name (str) – If specified, this name will be used if the automated conversion fails. This name is not checked to be valid.
Returns:

The valid ROS graph resource name.

Return type:

str

Raises:

ValueError – If the given text cannot be converted to a valid ROS name, and no fallback_name is specified.

cras.string_utils.transliterate_to_ascii(text)

Transliterate the given string from UTF-8 to ASCII (replace non-ASCII chars by closest ASCII chars).

Parameters:text (str) – The string to transliterate.
Returns:The transliterated string.
Return type:str

cras.test_utils module

Utilities helpful when writing ROS-related unit tests in the unittest framework.

class cras.test_utils.RosconsoleCapture(test, expected_level=None, expected_message=None)

Bases: logging.Handler

Helper class for peeking into rosout in a test and checking that the output corresponds to the expected values.

Usage from a test:

class MyTest(unittest.TestCase):
    def test_foo(self):
        with RosconsoleCapture(self, Log.INFO, "Test message"):
            call_foo()

This example tests that call_foo() calls rospy.loginfo("Test message"). If not, the test is marked as failed.

Parameters:
  • test (unittest.TestCase) – The test that examines the rosout.
  • expected_level (int|Tuple[int]) – Expected log level of the captured message. One of rosgraph_msgs/Log constants. Multiple logged messages may be captured by passing a tuple.
  • expected_message (str|Tuple[str]) – The expected captured message. Multiple logged messages may be captured by passing a tuple.
emit(record)

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

cras.time_utils module

Utilities helpful when working with rospy Time, Duration and Rate objects.

Parts of this library are adapted from BSD-licensed https://github.com/ros/ros_comm/blob/noetic-devel/clients/rospy/src/rospy/timer.py Copyright (c) 2010, Willow Garage, Inc.

class cras.time_utils.CustomRate(hz, now_fn, sleep_fn)

Bases: rospy.timer.Rate

Rate object that sleeps in a custom time.

Constructor.

Parameters:
  • hz (float) – hz rate to determine sleeping
  • now_fn (Callable[[],rospy.Time]) – Function that returns current time.
  • sleep_fn (Callable[[rospy.Duration],None]) – Function that sleeps for the given duration.
now()

Return the current time.

Returns:Current time.
Return type:rospy.Time
remaining()

Return the time remaining for rate to sleep. @return: time remaining @rtype: L{Time}

sleep()

Attempt sleep at the specified rate. sleep() takes into account the time elapsed since the last successful sleep().

@raise ROSInterruptException: if ROS shutdown occurs before sleep completes @raise ROSTimeMovedBackwardsException: if ROS time is set backwards

cras.time_utils.DURATION_MAX = rospy.Duration[2147483647999999999]

Maximum value of rospy.Duration that can be read without problems by other ROS client libraries.

cras.time_utils.DURATION_MIN = rospy.Duration[-2147483648000000000]

Minimum value of rospy.Duration that can be read without problems by other ROS client libraries.

class cras.time_utils.SteadyRate(hz)

Bases: cras.time_utils.CustomRate

Rate object that sleeps in monotonic time, i.e. is not affected by system time jumps.

Constructor.

Parameters:hz (float) – hz rate to determine sleeping
class cras.time_utils.SteadyTime(secs=0, nsecs=0)

Bases: rospy.rostime.Time

Time object that always uses monotonic time.

static now()

Create new L{Time} instance representing current time. This can either be wall-clock time or a simulated clock. It is strongly recommended that you use the now() factory to create current time representations instead of reading wall-clock time and create Time instances from it.

@return: L{Time} instance for current time @rtype: L{Time}

cras.time_utils.TIME_MAX = rospy.Time[4294967295999999999]

Maximum value of rospy.Time that can be read without problems by other ROS client libraries.

cras.time_utils.TIME_MIN = rospy.Time[1]

Minimum value of rospy.Time that can be read without problems by other ROS client libraries. Zero is not included.

class cras.time_utils.Timer(rate, callback, oneshot=False)

Bases: rospy.timer.Timer

Timer that can use custom implementations of rate.

Make sure to keep a reference to this timer for the whole time you want it working.

Constructor.

Parameters:
  • rate (CustomRate) – The rate to use for sleeping.
  • callback (Callable[[rospy.timer.TimerEvent],None]) – Callback to call when the rate is up.
  • oneshot (bool) – Whether the timer should only fire once and stop.
run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class cras.time_utils.WallRate(hz)

Bases: cras.time_utils.CustomRate

Rate object that sleeps in wall time.

Note:This is almost never a good idea as the timer is affected by system time jumps. Consider using SteadyRate instead.

Constructor.

Parameters:hz (float) – hz rate to determine sleeping
class cras.time_utils.WallTime(secs=0, nsecs=0)

Bases: rospy.rostime.Time

Time object that always uses wall time.

static now()

Create new L{Time} instance representing current time. This can either be wall-clock time or a simulated clock. It is strongly recommended that you use the now() factory to create current time representations instead of reading wall-clock time and create Time instances from it.

@return: L{Time} instance for current time @rtype: L{Time}

cras.time_utils.frequency(rate, max_cycle_time_means_zero=False)

Return the frequency represented by the given rate.

Parameters:
  • rate (Rate) – The rate to convert.
  • max_cycle_time_means_zero – If True, return 0 frequency in case the rate’s cycle time is the maximum duration.
Returns:

The frequency.

Return type:

float

cras.time_utils.rate_equals(rate1, rate2)

Compare two rates for equality.

Parameters:
  • rate1 (Rate) – Rate.
  • rate2 (Rate) – Rate.
Returns:

Whether the durations of the rates are equal.

Return type:

bool

cras.time_utils.safe_rate(freq)

Return a rate representing the given frequency. If the frequency is zero or too small, return min/max representable rate.

Parameters:freq (float) – The frequency to convert.
Returns:The corresponding Rate object.
Return type:Rate
cras.time_utils.slowest_negative_rate()

Slowest representable negative rate.

Returns:Slowest representable negative rate.
Return type:Rate
cras.time_utils.slowest_rate()

Slowest representable rate.

Returns:Slowest representable rate.
Return type:Rate
cras.time_utils.wallsleep(duration)

Sleep for the given duration in wall clock.

Note:The duration of the sleep is not affected by system time jumps (as if sleeping in monotonic clock).
Parameters:duration (Duration|float) – The duration to sleep for.

cras.topic_utils module

Utilities helpful when subscribing or publishing topics.

class cras.topic_utils.GenericMessageSubscriber(topic_name, callback, *args, **kwargs)

Bases: object

Drop-in replacement for rospy.Subscriber that subscribes to any type message, automatically deserializes it and calls the callback with the deserialized message instance.

Create the subscriber.

Parameters:
  • topic_name (str) – Name of the topic to subscribe.
  • callback (Callable[[Any],None]) – The callback expecting the deserialized message.
  • args (List[Any]) – rospy.Subscriber further arguments.
  • Any] kwargs (Dict[str,) – rospy.Subscriber further arguments.

Module contents

Utilities for more convenient usage of rospy.