Module API


This is the top-level namespace of the rocon_python_utils ROS package. This module provides a toolbox of general purpose python utilities.


This module is a library of general purpose exceptions that fill general needs not catered for by the python exception heirarchy. These exception names are all included in the main rocon_python_utils namespace. To catch one, import it this way:

from rocon_python_comms import TimeoutExpiredError

exception rocon_python_utils.exceptions.TimeoutExpiredError[source]

General purpose exception usable in any timeout situation.

class, ping_frequency=0.2)[source]

Bases: threading.Thread

The pinger class can run a threaded pinger at the desired frequency to check if a machine is available or not.


from import Pinger

pinger = Pinger('', 0.2)
# after some time
print("Statistics: %s" % pinger.get_latency())  # [min,avg,max,mean deviation]
print("Time since last seen %s" % pinger.get_time_since_last_seen())
__init__(ip, ping_frequency=0.2)[source]

Initialises but doesn’t start the thread yet. Use run() to start pinging and collecting statistics.

  • ip (str) –
  • ping_frequency (float) – periodically ping at this frequency (Hz)

Latency statistics generated from the internal window (buffer). This is a ring buffer, so it will only use the most recent ping data.

Returns:latency statistics (min, avg, max, mean deviation)
Return type:float[]

Time in seconds since this ip was last pingable.

Returns:time (s) since seen
Return type:float

Thread worker function - this does the actual pinging and records ping data in the ring buffers for processing into latency statistics as needed.


This module makes use of the python-catkin-pkg module and adds a few extra functions for interacting with catkin packages.


Find all packages on the given list of paths

Iterates over the given list of paths in reverse order so that packages found in the paths at the beginning of the list get overlaid onto packages with the same name which were found in paths farther back in the list.

The resulting dictionary is keyed by the package name (so packages with duplicate names are overlaid) and the values are the catkin_pkg.package.Package class.

Parameters:ros_package_path (str[]) – list of paths to search
Returns:dictionary of package objects keyed by name of the package
Return type:dict
Todo:overlay duplicates only if the version is newer


This module contains converters and ros resource finders for icons and icon messages.


Loads the icon resource and puts in rocon_std_msgs.Icon format

Parameters:resource (str) – resource identifier (package/name)
Returns:the icon in msg format
Return type:rocon_std_msgs.Icon

Loads the specified filename and puts in rocon_std_msgs.Icon format

Parameters:filename (str) – to the icon resource.
Returns:the icon in msg format
Return type:rocon_std_msgs.Icon


This module contains helpers that lookup or collect an index of ros resource names for various purposes.

rocon_python_utils.ros.resources.find_resource(package, filename, rospack=None)[source]

Convenience wrapper around roslib to find a resource (file) inside a package. It checks the output, and provides the appropriate error if there is one.

  • package (str) – ros package
  • filename (str) – some file inside the specified package

absolute path to the file

Return type:



rospkg.ResourceNotFound : raised if there is nothing found or multiple objects found.

rocon_python_utils.ros.resources.find_resource_from_string(resource, rospack=None, extension=None)[source]

Convenience wrapper around roslib to find a resource (file) inside a package. This function passes off the work to find_resource once the input string is split.

Pass it a shared rospack (rospkg.RosPack) object to accelerate the crawling across the ROS_PACKAGE_PATH when you are calling this function for many resources consecutively.

rospack = rospkg.RosPack()
for ros_resource_name in ['rocon_interactions/pc.interactions', 'rocon_interactions/demo.interactions']
    filename = find_resource_from_string(ros_resource_name, rospack)
    # do something
  • resource (str) – ros resource name (in the form package/filename)
  • rospack (rospkg.RosPack) – a caching utility to help accelerate catkin filesystem lookups
  • extension (str) – file name extension to look for/expect

full pathname to the resource

Return type:



rospkg.ResourceNotFound raised if the resource is not found or has an inappropriate extension.

rocon_python_utils.ros.resources.resource_index_from_package_exports(export_tag, package_paths=None, package_whitelist=None, package_blacklist=[])[source]

Scans the package path looking for exports and grab the ones we are interested in.

  • export_tag (str) – export tagname
  • package_paths (str[]) – list of package paths to scan over
  • package_whitelist – list of packages to include (and no other)
  • package_blacklist – list of packages to exclude

the dictionary of resource and its absolute path

Return type:

dict { resource_name : os.path }



Abstraction for pulling the rosdistro version (e.g. groovy/hydro). We are using the configured parameter here, but you could equally use one of the non-runtime techniques in rospkg elucidated on ros answers by Jack:

Returns:the human readable string version for the ros distro
Return type:str


This module wraps a few command line executables with a convenient python api.


Wrapper around the command line ‘which’ program.

Returns:path to the program or None if it doesnt exist.
Return type:str or None

This module includes a few helpers to enable working with system pids in python.[source]

Check whether pid exists in the current process table.

Parameters:pid (int) –
Returns:true or false depending on its existence.
Return type:bool, timeout=None)[source]

Wait for process with pid ‘pid’ to terminate and return its exit status code as an integer.

If pid is not a children of os.getpid() (current process) just waits until the process disappears and return None.

If pid does not exist at all return None immediately.

  • pid (int) –
  • timeout (float) – timeout in seconds

TimeoutExpiredError on timeout expired (if specified).


This module helps workaround the official popen with another implementation that solves a few problems (post-exec callbacks, etc)

class rocon_python_utils.system.popen.Popen(popen_args, shell=False, preexec_fn=None, postexec_fn=None, env=None)[source]

Bases: object

Use this if you want to attach a postexec function to popen (which is not supported by popen at all). It also defaults setting and terminating whole process groups (something we do quite often in ros).


This is what you’d do starting a rosrunnable with a listener for termination.

def process_listener():
    print("subprocess terminating")

command_args = ['rosrun', package_name, rosrunnable_filename, '__name:=my_node']
process = rocon_python_utils.system.Popen(command_args, postexec_fn=process_listener)
__init__(popen_args, shell=False, preexec_fn=None, postexec_fn=None, env=None)[source]
  • popen_args (str[]) – list/tuple of usual popen args
  • shell (bool) – same as the shell argument passed to subprocess.Popen
  • preexec_fn (method with no args) – usual popen pre-exec function
  • postexec_fn (method with no args) – the callback which we support for postexec.
  • env (dict) – a customised environment to run the process in.

Send the process a posix signal. See man 7 signal for a list and pass them by keyword (e.g. signal.SIGINT) or directly by integer value.

Parameters:sig (int) – one of the posix signals.
Raises:OSError if the process has already shut down.