Module API

rocon_python_utils

This is the top-level namespace of the rocon_python_utils ROS package. This module provides a toolbox of general purpose python utilities.

rocon_python_utils.exceptions

This module is a library of general purpose exceptions that fill general needs not catered for by the python exception heirarchy. These exception names are all included in the main rocon_python_utils namespace. To catch one, import it this way:

from rocon_python_comms import TimeoutExpiredError

exception rocon_python_utils.exceptions.TimeoutExpiredError[source]

General purpose exception usable in any timeout situation.

rocon_python_utils.network.pinger

class rocon_python_utils.network.pinger.Pinger(ip, ping_frequency=0.2)[source]

Bases: threading.Thread

The pinger class can run a threaded pinger at the desired frequency to check if a machine is available or not.

Usage:

from rocon_python_utils.network import Pinger

pinger = Pinger('192.168.1.3', 0.2)
pinger.start()
# after some time
print("Statistics: %s" % pinger.get_latency())  # [min,avg,max,mean deviation]
print("Time since last seen %s" % pinger.get_time_since_last_seen())
__init__(ip, ping_frequency=0.2)[source]

Initialises but doesn’t start the thread yet. Use run() to start pinging and collecting statistics.

Parameters:
  • ip (str) –
  • ping_frequency (float) – periodically ping at this frequency (Hz)
get_latency()[source]

Latency statistics generated from the internal window (buffer). This is a ring buffer, so it will only use the most recent ping data.

Returns:latency statistics (min, avg, max, mean deviation)
Return type:float[]
get_time_since_last_seen()[source]

Time in seconds since this ip was last pingable.

Returns:time (s) since seen
Return type:float
run()[source]

Thread worker function - this does the actual pinging and records ping data in the ring buffers for processing into latency statistics as needed.

rocon_python_utils.ros.catkin

This module makes use of the python-catkin-pkg module and adds a few extra functions for interacting with catkin packages.


rocon_python_utils.ros.catkin.package_index_from_package_path(package_paths)[source]

Find all packages on the given list of paths

Iterates over the given list of paths in reverse order so that packages found in the paths at the beginning of the list get overlaid onto packages with the same name which were found in paths farther back in the list.

The resulting dictionary is keyed by the package name (so packages with duplicate names are overlaid) and the values are the catkin_pkg.package.Package class.

Parameters:ros_package_path (str[]) – list of paths to search
Returns:dictionary of package objects keyed by name of the package
Return type:dict
Todo:overlay duplicates only if the version is newer

rocon_python_utils.ros.icons

This module contains converters and ros resource finders for icons and icon messages.


rocon_python_utils.ros.icons.icon_resource_to_msg(resource)[source]

Loads the icon resource and puts in rocon_std_msgs.Icon format

Parameters:resource (str) – resource identifier (package/name)
Returns:the icon in msg format
Return type:rocon_std_msgs.Icon
Raises:rospkg.ResourceNotFound raised if the resource is not found or has an inappropriate extension.
rocon_python_utils.ros.icons.icon_to_msg(filename)[source]

Loads the specified filename and puts in rocon_std_msgs.Icon format

Parameters:filename (str) – to the icon resource.
Returns:the icon in msg format
Return type:rocon_std_msgs.Icon

rocon_python_utils.ros.resources

This module contains helpers that lookup or collect an index of ros resource names for various purposes.


rocon_python_utils.ros.resources.find_resource(package, filename, rospack=None)[source]

Convenience wrapper around roslib to find a resource (file) inside a package. It checks the output, and provides the appropriate error if there is one.

Parameters:
  • package (str) – ros package
  • filename (str) – some file inside the specified package
Returns:

absolute path to the file

Return type:

str

Raises:

rospkg.ResourceNotFound : raised if there is nothing found or multiple objects found.

rocon_python_utils.ros.resources.find_resource_from_string(resource, rospack=None, extension=None)[source]

Convenience wrapper around roslib to find a resource (file) inside a package. This function passes off the work to find_resource once the input string is split.

Pass it a shared rospack (rospkg.RosPack) object to accelerate the crawling across the ROS_PACKAGE_PATH when you are calling this function for many resources consecutively.

rospack = rospkg.RosPack()
for ros_resource_name in ['rocon_interactions/pc.interactions', 'rocon_interactions/demo.interactions']
    filename = find_resource_from_string(ros_resource_name, rospack)
    # do something
Parameters:
  • resource (str) – ros resource name (in the form package/filename)
  • rospack (rospkg.RosPack) – a caching utility to help accelerate catkin filesystem lookups
  • extension (str) – file name extension to look for/expect
Returns:

full pathname to the resource

Return type:

str

Raises:

rospkg.ResourceNotFound raised if the resource is not found or has an inappropriate extension.

rocon_python_utils.ros.resources.find_resource_pair_from_string(resource, rospack=None, extension=None)[source]

Convenience wrapper around roslib to find a resource (file) inside a package. This function passes off the work to find_resource once the input string is split.

Pass it a shared rospack (rospkg.RosPack) object to accelerate the crawling across the ROS_PACKAGE_PATH when you are calling this function for many resources consecutively.

rospack = rospkg.RosPack()
for ros_resource_name in ['rocon_interactions/pc.interactions', 'rocon_interactions/demo.interactions']
    filename = find_resource_from_string(ros_resource_name, rospack)
    # do something
Parameters:
  • resource (str) – ros resource name (in the form package/filename)
  • rospack (rospkg.RosPack) – a caching utility to help accelerate catkin filesystem lookups
  • extension (str) – file name extension to look for/expect
Returns:

(package, full pathname) to the resource

Return type:

(str, str)

Raises:

rospkg.ResourceNotFound raised if the resource is not found or has an inappropriate extension.

rocon_python_utils.ros.resources.package_resource_name(name)[source]

Split a name into its package and resource name parts, e.g.

  • ‘std_msgs/String -> std_msgs, String’
  • ‘gopher_gazebo/gocart.xacro -> gopher_gazebo, gocart.xacro’
  • ‘gopher_gazebo/urfd/gocart.xacro -> gopher_gazebo, urdf/gocart.xacro’

This emulates what the original roslib.names.package_resource_name() function did, but also caters for the third example above where its a full relative path inside the package so as to disambiguate against multiple matches.

@param name: package resource name, e.g. ‘std_msgs/String’ @type name: str @return: package name, resource name @rtype: str @raise rospkg.ResourceNotFound: if name is invalid (cannot split into two arguments)

rocon_python_utils.ros.resources.resource_index_from_package_exports(export_tag, package_paths=None, package_whitelist=None, package_blacklist=[])[source]

Scans the package path looking for exports and grab the ones we are interested in.

Parameters:
  • export_tag (str) – export tagname
  • package_paths (str[]) – list of package paths to scan over
  • package_whitelist – list of packages to include (and no other)
  • package_blacklist – list of packages to exclude
Returns:

the dictionary of resource and its absolute path

Return type:

dict { resource_name : os.path }

rocon_python_utils.ros.rosdistro


rocon_python_utils.ros.rosdistro.get_rosdistro()[source]

Abstraction for pulling the rosdistro version (e.g. groovy/hydro). We are using the configured parameter here, but you could equally use one of the non-runtime techniques in rospkg elucidated on ros answers by Jack:

http://answers.ros.org/question/36485/how-do-i-test-the-ros-version-in-python-code/

Returns:the human readable string version for the ros distro
Return type:str

rocon_python_utils.system.command_line_wrappers

This module wraps a few command line executables with a convenient python api.


rocon_python_utils.system.command_line_wrappers.which(program)[source]

Wrapper around the command line ‘which’ program.

Returns:path to the program or None if it doesnt exist.
Return type:str or None

rocon_python_utils.system.pid

This module includes a few helpers to enable working with system pids in python.


rocon_python_utils.system.pid.pid_exists(pid)[source]

Check whether pid exists in the current process table.

Parameters:pid (int) –
Returns:true or false depending on its existence.
Return type:bool
rocon_python_utils.system.pid.wait_pid(pid, timeout=None)[source]

Wait for process with pid ‘pid’ to terminate and return its exit status code as an integer.

If pid is not a children of os.getpid() (current process) just waits until the process disappears and return None.

If pid does not exist at all return None immediately.

Parameters:
  • pid (int) –
  • timeout (float) – timeout in seconds
Raises:

TimeoutExpiredError on timeout expired (if specified).

rocon_python_utils.system.popen

This module helps workaround the official popen with another implementation that solves a few problems (post-exec callbacks, etc)


class rocon_python_utils.system.popen.Popen(popen_args, shell=False, preexec_fn=None, postexec_fn=None, env=None)[source]

Bases: object

Use this if you want to attach a postexec function to popen (which is not supported by popen at all). It also defaults setting and terminating whole process groups (something we do quite often in ros).

Usage:

This is what you’d do starting a rosrunnable with a listener for termination.

def process_listener():
    print("subprocess terminating")

command_args = ['rosrun', package_name, rosrunnable_filename, '__name:=my_node']
process = rocon_python_utils.system.Popen(command_args, postexec_fn=process_listener)
__init__(popen_args, shell=False, preexec_fn=None, postexec_fn=None, env=None)[source]
Parameters:
  • popen_args (str[]) – list/tuple of usual popen args
  • shell (bool) – same as the shell argument passed to subprocess.Popen
  • preexec_fn (method with no args) – usual popen pre-exec function
  • postexec_fn (method with no args) – the callback which we support for postexec.
  • env (dict) – a customised environment to run the process in.
send_signal(sig)[source]

Send the process a posix signal. See man 7 signal for a list and pass them by keyword (e.g. signal.SIGINT) or directly by integer value.

Parameters:sig (int) – one of the posix signals.
terminate()[source]
Raises:OSError if the process has already shut down.

rocon_python_utils.yaml.configure

configure – configuration machinery on top of YAML

class rocon_python_utils.yaml.configure.Configuration(struct=None, pwd=None, parent=None)[source]

Bases: _abcoll.MutableMapping

Configuration object

You should never instantiate this object but use from_file, from_string or from_dict classmethods instead. Implements collections.MutableMapping protocol.

configure(struct=None, _root=True)[source]

Commit configuration

This method performs all actions pending to this Configuration object. You can also override configuration at this moment by providing mapping object as struct argument.

classmethod from_dict(cfg, pwd=None, configure=True)[source]

Construct Configuration from dict d.

Parameters:d – mapping object to use for config
classmethod from_file(filename, ctx=None, pwd=None, constructors=None, multi_constructors=None, configure=True)[source]

Construct Configuration object by reading and parsing file filename.

Parameters:
  • filename – filename to parse config from
  • ctx – mapping object used for value interpolation
  • constructors – mapping of names to constructor for custom objects in YAML. Look at _timedelta_constructor and _re_constructor for examples.
classmethod from_string(string, ctx=None, pwd=None, constructors=None, multi_constructors=None, configure=True)[source]

Construct Configuration from string.

Parameters:
  • string – string to parse config from
  • ctx – mapping object used for value interpolation
  • constructors – mapping of names to constructor for custom objects in YAML. Look at _timedelta_constructor and _re_constructor for examples.
merge(config)[source]

Produce new configuration by merging config object into this one

exception rocon_python_utils.yaml.configure.ConfigurationError[source]

Bases: exceptions.ValueError

Configuration error

__weakref__

list of weak references to the object (if defined)

rocon_python_utils.yaml.configure.configure_logging(logcfg=None, disable_existing_loggers=True)[source]

Configure logging in a sane way

Parameters:
  • logcfg – may be a. a dict suitable for logging.config.dictConfig(), b. “syslog” string or c. None
  • disable_existing_loggers – if we need to disable existing loggers
rocon_python_utils.yaml.configure.import_string(import_name, silent=False)[source]

Imports an object based on a string. This is useful if you want to use import paths as endpoints or something similar. An import path can be specified either in dotted notation (xml.sax.saxutils.escape) or with a colon as object delimiter (xml.sax.saxutils:escape).

If silent is True the return value will be None if the import fails.

For better debugging we recommend the new import_module() function to be used instead.

Parameters:
  • import_name – the dotted name for the object to import.
  • silent – if set to True import errors are ignored and None is returned instead.
Returns:

imported object

Copyright:
  1. 2011 by the Werkzeug Team
exception rocon_python_utils.yaml.configure.ImportStringError(import_name, exception)[source]

Bases: exceptions.ImportError

Provides information about a failed import_string() attempt.

Copyright:
  1. 2011 by the Werkzeug Team
__weakref__

list of weak references to the object (if defined)

exception = None

Wrapped exception.

import_name = None

String in dotted notation that failed to be imported.