Module API

rocon_python_utils

This is the top-level namespace of the rocon_python_utils ROS package. This module provides a toolbox of general purpose python utilities.

rocon_python_utils.exceptions

This module is a library of general purpose exceptions that fill general needs not catered for by the python exception heirarchy. These exception names are all included in the main rocon_python_utils namespace. To catch one, import it this way:

from rocon_python_comms import TimeoutExpiredError

exception rocon_python_utils.exceptions.TimeoutExpiredError[source]

General purpose exception usable in any timeout situation.

rocon_python_utils.network.pinger

class rocon_python_utils.network.pinger.Pinger(ip, ping_frequency=0.2)[source]

Bases: threading.Thread

The pinger class can run a threaded pinger at the desired frequency to check if a machine is available or not.

Usage:

from rocon_python_utils.network import Pinger

pinger = Pinger('192.168.1.3', 0.2)
pinger.start()
# after some time
print("Statistics: %s" % pinger.get_latency())  # [min,avg,max,mean deviation]
print("Time since last seen %s" % pinger.get_time_since_last_seen())
__init__(ip, ping_frequency=0.2)[source]

Initialises but doesn’t start the thread yet. Use run() to start pinging and collecting statistics.

Parameters:
  • ip (str) –
  • ping_frequency (float) – periodically ping at this frequency (Hz)
get_latency()[source]

Latency statistics generated from the internal window (buffer). This is a ring buffer, so it will only use the most recent ping data.

Returns:latency statistics (min, avg, max, mean deviation)
Return type:float[]
get_time_since_last_seen()[source]

Time in seconds since this ip was last pingable.

Returns:time (s) since seen
Return type:float
run()[source]

Thread worker function - this does the actual pinging and records ping data in the ring buffers for processing into latency statistics as needed.

rocon_python_utils.ros.catkin

This module makes use of the python-catkin-pkg module and adds a few extra functions for interacting with catkin packages.


rocon_python_utils.ros.catkin.package_index_from_package_path(package_paths)[source]

Find all packages on the given list of paths

Iterates over the given list of paths in reverse order so that packages found in the paths at the beginning of the list get overlaid onto packages with the same name which were found in paths farther back in the list.

The resulting dictionary is keyed by the package name (so packages with duplicate names are overlaid) and the values are the catkin_pkg.package.Package class.

Parameters:ros_package_path (str[]) – list of paths to search
Returns:dictionary of package objects keyed by name of the package
Return type:dict
Todo:overlay duplicates only if the version is newer

rocon_python_utils.ros.icons

This module contains converters and ros resource finders for icons and icon messages.


rocon_python_utils.ros.icons.icon_resource_to_msg(resource)[source]

Loads the icon resource and puts in rocon_std_msgs.Icon format

Parameters:resource (str) – resource identifier (package/name)
Returns:the icon in msg format
Return type:rocon_std_msgs.Icon
rocon_python_utils.ros.icons.icon_to_msg(filename)[source]

Loads the specified filename and puts in rocon_std_msgs.Icon format

Parameters:filename (str) – to the icon resource.
Returns:the icon in msg format
Return type:rocon_std_msgs.Icon

rocon_python_utils.ros.resources

This module contains helpers that lookup or collect an index of ros resource names for various purposes.


rocon_python_utils.ros.resources.find_resource(package, filename, rospack=None)[source]

Convenience wrapper around roslib to find a resource (file) inside a package. It checks the output, and provides the appropriate error if there is one.

Parameters:
  • package (str) – ros package
  • filename (str) – some file inside the specified package
Returns:

absolute path to the file

Return type:

str

Raises:

rospkg.ResourceNotFound : raised if there is nothing found or multiple objects found.

rocon_python_utils.ros.resources.find_resource_from_string(resource, rospack=None, extension=None)[source]

Convenience wrapper around roslib to find a resource (file) inside a package. This function passes off the work to find_resource once the input string is split.

Pass it a shared rospack (rospkg.RosPack) object to accelerate the crawling across the ROS_PACKAGE_PATH when you are calling this function for many resources consecutively.

rospack = rospkg.RosPack()
for ros_resource_name in ['rocon_interactions/pc.interactions', 'rocon_interactions/demo.interactions']
    filename = find_resource_from_string(ros_resource_name, rospack)
    # do something
Parameters:
  • resource (str) – ros resource name (in the form package/filename)
  • rospack (rospkg.RosPack) – a caching utility to help accelerate catkin filesystem lookups
  • extension (str) – file name extension to look for/expect
Returns:

full pathname to the resource

Return type:

str

Raises:

rospkg.ResourceNotFound raised if the resource is not found or has an inappropriate extension.

rocon_python_utils.ros.resources.resource_index_from_package_exports(export_tag, package_paths=None, package_whitelist=None, package_blacklist=[])[source]

Scans the package path looking for exports and grab the ones we are interested in.

Parameters:
  • export_tag (str) – export tagname
  • package_paths (str[]) – list of package paths to scan over
  • package_whitelist – list of packages to include (and no other)
  • package_blacklist – list of packages to exclude
Returns:

the dictionary of resource and its absolute path

Return type:

dict { resource_name : os.path }

rocon_python_utils.ros.rosdistro


rocon_python_utils.ros.rosdistro.get_rosdistro()[source]

Abstraction for pulling the rosdistro version (e.g. groovy/hydro). We are using the configured parameter here, but you could equally use one of the non-runtime techniques in rospkg elucidated on ros answers by Jack:

http://answers.ros.org/question/36485/how-do-i-test-the-ros-version-in-python-code/

Returns:the human readable string version for the ros distro
Return type:str

rocon_python_utils.system.command_line_wrappers

This module wraps a few command line executables with a convenient python api.


rocon_python_utils.system.command_line_wrappers.which(program)[source]

Wrapper around the command line ‘which’ program.

Returns:path to the program or None if it doesnt exist.
Return type:str or None

rocon_python_utils.system.pid

This module includes a few helpers to enable working with system pids in python.


rocon_python_utils.system.pid.pid_exists(pid)[source]

Check whether pid exists in the current process table.

Parameters:pid (int) –
Returns:true or false depending on its existence.
Return type:bool
rocon_python_utils.system.pid.wait_pid(pid, timeout=None)[source]

Wait for process with pid ‘pid’ to terminate and return its exit status code as an integer.

If pid is not a children of os.getpid() (current process) just waits until the process disappears and return None.

If pid does not exist at all return None immediately.

Parameters:
  • pid (int) –
  • timeout (float) – timeout in seconds
Raises:

TimeoutExpiredError on timeout expired (if specified).

rocon_python_utils.system.popen

This module helps workaround the official popen with another implementation that solves a few problems (post-exec callbacks, etc)


class rocon_python_utils.system.popen.Popen(popen_args, shell=False, preexec_fn=None, postexec_fn=None, env=None)[source]

Bases: object

Use this if you want to attach a postexec function to popen (which is not supported by popen at all). It also defaults setting and terminating whole process groups (something we do quite often in ros).

Usage:

This is what you’d do starting a rosrunnable with a listener for termination.

def process_listener():
    print("subprocess terminating")

command_args = ['rosrun', package_name, rosrunnable_filename, '__name:=my_node']
process = rocon_python_utils.system.Popen(command_args, postexec_fn=process_listener)
__init__(popen_args, shell=False, preexec_fn=None, postexec_fn=None, env=None)[source]
Parameters:
  • popen_args (str[]) – list/tuple of usual popen args
  • shell (bool) – same as the shell argument passed to subprocess.Popen
  • preexec_fn (method with no args) – usual popen pre-exec function
  • postexec_fn (method with no args) – the callback which we support for postexec.
  • env (dict) – a customised environment to run the process in.
send_signal(sig)[source]

Send the process a posix signal. See man 7 signal for a list and pass them by keyword (e.g. signal.SIGINT) or directly by integer value.

Parameters:sig (int) – one of the posix signals.
terminate()[source]
Raises:OSError if the process has already shut down.