Source code for rocon_launch.utils

#!/usr/bin/env python
#
# License: BSD
#   https://raw.github.com/robotics-in-concert/rocon_tools/license/LICENSE
#
##############################################################################
# Description
##############################################################################

"""
.. module:: utils
   :platform: Unix
   :synopsis: Utilities supporting rocon_launch.


This module provides some supporting utilities for rocon launches.

----

"""
##############################################################################
# Imports
##############################################################################

import rocon_console.console as console
import roslaunch
import subprocess
import sys
import xml.etree.ElementTree as ElementTree

from .roslaunch_configuration import RosLaunchConfiguration

##############################################################################
# Public Methods
##############################################################################


[docs]def parse_rocon_launcher(rocon_launcher, default_roslaunch_options, args_mappings={}): ''' Parses an rocon multi-launcher (xml file). :param str rocon_launcher: xml string in rocon_launch format :param default_roslaunch_options: options to pass to roslaunch (usually "--screen") :param dict args_mappings: command line mapping overrides, { arg_name : arg_value } :returns: list of launch configurations :rtype: :class:`.RosLaunchConfiguration`[] :raises :exc:`.InvalidRoconLauncher` : if any roslaunch configuration failed ''' tree = ElementTree.parse(rocon_launcher) root = tree.getroot() # should check for root concert tag launchers = [] ports = [] # These are intended for re-use in launcher args via $(arg ...) like regular roslaunch vars_dict = {} # We do this the roslaunch way since we use their resolvers, even if we only do it for args. vars_dict['arg'] = {} args_dict = vars_dict['arg'] # convenience ref to the vars_dict['args'] variable for tag in root.findall('arg'): name, value = _process_arg_tag(tag, args_dict) args_dict[name] = value args_dict.update(args_mappings) # bring in command line overrides for launch in root.findall('launch'): port = launch.get('port', RosLaunchConfiguration.default_port) args = [] for tag in launch.findall('arg'): name, value = _process_arg_tag(tag, vars_dict) args.append((name, value)) launch_configuration = RosLaunchConfiguration( name=launch.get('name'), package=launch.get('package'), port=port, title=launch.get('title', 'rocon_launch:%s' % str(port)), args=args, options=default_roslaunch_options ) if port in ports: launch_configuration.append_option("--wait") else: ports.append(port) launchers.append(launch_configuration) return launchers
[docs]def get_roslaunch_pids(parent_pid): ''' Search the pstree of the specified pid for roslaunch processes. We use this to aid in gracefully terminating any roslaunch processes running in terminals before closing down the terminals themselves. :param str parent_pid: the pid of the parent process. :returns: list of pids :rtype: str[] ''' if parent_pid is None: console.warning("aborting call to find child roslaunches of a non-existant parent pid (can happen if cancelling spawned processes while they are still establishing).") return [] ps_command = subprocess.Popen("ps -o pid -o comm --ppid %d --noheaders" % parent_pid, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) ps_output = ps_command.stdout.read() retcode = ps_command.wait() pids = [] if retcode == 0: for pair in ps_output.split("\n")[:-1]: try: [pid, command] = pair.lstrip(' ').split(" ") except ValueError: # when we can't unpack the output into two pieces # ignore, it's not a roslaunch console.warning("Rocon Launch : bad pair while scanning for roslaunch pids [%s]" % pair) continue if command == 'roslaunch': pids.append(int(pid)) else: pids.extend(get_roslaunch_pids(int(pid))) else: # Presume this roslaunch was killed by ctrl-c or terminated already. # Am not worrying about classifying between the above presumption and real errors for now pass return pids
############################################################################## # Internal Methods ############################################################################## def _process_arg_tag(tag, args_dict=None): ''' Process the arg tag. Kind of hate replicating what roslaunch does with arg tags, but there's no easy way to pull roslaunch code. :param args_dict: dictionary of args previously discovered :returns: name, value pairs for the args :rtype: (str, str) :todo: get rid of the sys.exits and replace with exceptions ''' name = tag.get('name') # returns None if not found. if name is None: console.error("<arg> tag must have a name attribute.") sys.exit(1) value = tag.get('value') default = tag.get('default') #print("Arg tag processing: (%s, %s, %s)" % (name, value, default)) if value is not None and default is not None: console.error("<arg> tag must have one and only one of value/default attributes specified.") sys.exit(1) if value is None and default is None: console.error("<arg> tag must have one of value/default attributes specified.") sys.exit(1) if value is None: value = default if value and '$' in value: value = roslaunch.substitution_args.resolve_args(value, args_dict) return (name, value)