Source code for rocon_launch.launch

#!/usr/bin/env python
#
# License: BSD
#   https://raw.github.com/robotics-in-concert/rocon_tools/license/LICENSE
#
##############################################################################
# Description
##############################################################################

"""
.. module:: launch
   :platform: Unix
   :synopsis: Machinery for spawning multiple roslaunchers.


This module contains the machinery for spawning and managing multiple terminals
that execute a pre-configured roslaunch inside each.

----

"""
##############################################################################
# Imports
##############################################################################

import os
import argparse
import signal
import sys
import roslaunch
import rocon_python_comms
import rosgraph
import rocon_console.console as console

from .exceptions import UnsupportedTerminal
from . import terminals
from . import utils

##############################################################################
# Methods
##############################################################################


[docs]def parse_arguments(): """ Argument parser for the rocon_launch script. """ parser = argparse.ArgumentParser(description="Rocon's multi-roslauncher.") terminal_group = parser.add_mutually_exclusive_group() terminal_group.add_argument('-k', '--konsole', default=False, action='store_true', help='spawn individual ros systems via multiple konsole terminals') terminal_group.add_argument('-g', '--gnome', default=False, action='store_true', help='spawn individual ros systems via multiple gnome terminals') parser.add_argument('--screen', action='store_true', help='run each roslaunch with the --screen option') parser.add_argument('--no-terminals', action='store_true', help='do not spawn terminals for each roslaunch') parser.add_argument('--hold', action='store_true', help='hold terminals open after upon completion (incompatible with --no-terminals)') # Force package, launcher pairs, I like this better than roslaunch style which is a bit vague parser.add_argument('package', nargs='?', default='', help='name of the package in which to find the concert launcher') parser.add_argument('launcher', nargs=1, help='name of the concert launch configuration (xml) file') #parser.add_argument('launchers', nargs='+', help='package and concert launch configuration (xml) file configurations, roslaunch style') mappings = rosgraph.names.load_mappings(sys.argv) # gets the arg mappings, e.g. scheduler_type:=simple argv = rosgraph.myargv(sys.argv[1:]) # strips the mappings args = parser.parse_args(argv) if args.no_terminals: args.terminal_name = terminals.active elif args.konsole: args.terminal_name = terminals.konsole elif args.gnome: args.terminal_name = terminals.gnome_terminal else: args.terminal_name = None return (args, mappings)
[docs]class RoconLaunch(object): """ The rocon launcher. Responsible for all the pieces of the puzzle that go into spawning roslaunches inside terminals (startup, shutdown and signal handling). """ __slots__ = [ 'terminal', 'processes', 'temporary_files', # store temporary files that need to unlinked on shutdown 'hold' # keep terminals open when sighandling them ]
[docs] def __init__(self, terminal_name, hold=False): """ Initialise empty of processes, but configure a terminal so we're ready to go when we start spawning. :param str terminal_name: string name (or None) for the terminal to use. :param bool hold: whether or not to hold windows open or not. """ self.processes = [] """List of spawned :class:`subprocess.Popen` terminal processes.""" self.temporary_files = [] """List of temporary files used to construct launches that we must unlink before finishing.""" self.hold = hold try: self.terminal = terminals.create_terminal(terminal_name) except (UnsupportedTerminal, rocon_python_comms.NotFoundException) as e: console.error("Cannot find a suitable terminal [%s]" % str(e)) sys.exit(1)
[docs] def signal_handler(self, sig, frame): ''' Special handler that gets triggered if someone hits CTRL-C in the original terminal that executed 'rocon_launch'. We catch the interrupt here, search and eliminate all child roslaunch processes first (give them time to gracefully quit) and then finally close the terminals themselves. closing down the terminals themselves. :param str sig: signal id (usually looking for SIGINT - 2 here) :param frame: frame ''' self.terminal.shutdown_roslaunch_windows(self.processes, self.hold) # Have to unlink them here rather than in the actual spawning stage, # because terminal subprocesses take a while to kick in (in the background) # and the unlinking may occur before it actually runs the roslaunch that # needs the file. for f in self.temporary_files: os.unlink(f.name)
[docs] def spawn_roslaunch_window(self, launch_configuration): """ :param launch_configuration: :type launch_configuration :class:`.RosLaunchConfiguration` """ process, meta_roslauncher = self.terminal.spawn_roslaunch_window(launch_configuration) self.processes.append(process) self.temporary_files.append(meta_roslauncher)
[docs]def main(): """ Main function for the rocon_launch script. """ (args, mappings) = parse_arguments() rocon_launch = RoconLaunch(args.terminal_name, args.hold) signal.signal(signal.SIGINT, rocon_launch.signal_handler) if args.package == '': rocon_launcher = roslaunch.rlutil.resolve_launch_arguments(args.launcher)[0] else: rocon_launcher = roslaunch.rlutil.resolve_launch_arguments([args.package] + args.launcher)[0] if args.screen: roslaunch_options = "--screen" else: roslaunch_options = "" launchers = utils.parse_rocon_launcher(rocon_launcher, roslaunch_options, mappings) for launcher in launchers: console.pretty_println("Launching %s on port %s" % (launcher.path, launcher.port), console.bold) rocon_launch.spawn_roslaunch_window(launcher) signal.pause()