Source code for rocon_launch.terminals

#!/usr/bin/env python
#
# License: BSD
#   https://raw.github.com/robotics-in-concert/rocon_tools/license/LICENSE
#
##############################################################################
# Description
##############################################################################

"""
.. module:: terminals
   :platform: Unix
   :synopsis: Terminal specific support.


This module provides a list of supported terminals and methods to handle them.

----

"""
##############################################################################
# Imports
##############################################################################

import os
import rocon_console.console as console
import rocon_python_comms
import rocon_python_utils
import signal
import sys
import subprocess
import tempfile
import time
from urlparse import urlparse

from .exceptions import UnsupportedTerminal
from . import utils

##############################################################################
# Supported Terminals
##############################################################################

active = "active"  # the currently open terminal
"""String identifier for the currently open (active) terminal"""
konsole = "konsole"
"""String identifier for KDE's konsole terminal"""
gnome_terminal = "gnome-terminal"
"""String identifier for Gnome's terminal."""
gnome_terminal_wrapper = "gnome-terminal.wrapper"  # some systems use this for gnome-terminal
"""String identifier for an oft used representation of gnome's terminal on desktops like KDE."""

##############################################################################
# Terminal
##############################################################################


class Terminal(object):
    __slots__ = ['name']

    def __init__(self, name):
        """
        Creates a manager for the terminal with supporting methods and variables.
        :param str name: name of this terminal.
        """
        self.name = name

    def shutdown_roslaunch_windows(self, processes, hold):
        """
        Shuts down a roslaunch window cleanly, i.e. it first kills the roslaunch
        processes, then kills the terminal itself.
        """
        roslaunch_pids = []
        for process in processes:
            roslaunch_pids.extend(utils.get_roslaunch_pids(process.pid))
        # kill roslaunch's
        for pid in roslaunch_pids:
            try:
                os.kill(pid, signal.SIGHUP)
            except OSError:
                continue
        for pid in roslaunch_pids:
            console.pretty_println("Terminating roslaunch [pid: %d]" % pid, console.bold)
            rocon_python_utils.system.wait_pid(pid)
            #console.pretty_println("Terminated roslaunch [pid: %d]" % pid, console.bold)
        time.sleep(1)
        if hold:
            try:
                raw_input("Press <Enter> to close terminals...")
            except RuntimeError:
                pass  # this happens when you ctrl-c again instead of enter
        # now kill the terminal itself
        for process in processes:
            try:
                os.killpg(process.pid, signal.SIGTERM)
            except OSError:
                console.warning("Kill signal failed to reach the terminal - typically this means the terminal has already shut down.")
            except TypeError as e:
                console.error("Invalid pid value [%s][%s]" % (str(process.pid), str(e)))
            #process.terminate()

    def _prepare_meta_roslauncher(self, roslaunch_configuration):
        """
        Generate a meta roslauncher which calls our real roslaunch provided in the
        launch configuration. This applies the more esoteric options specified
        in the launch configuration, e.g. screen, args.

        :param roslaunch_configuration: required roslaunch info
        :type roslaunch_configuration: :class:`.RosLaunchConfiguration`
        :returns: handle to the temporary meta roslaunch file
        :rtype: :class:`tempfile.NamedTemporaryFile`
        """
        ros_launch_file = tempfile.NamedTemporaryFile(mode='w+t', delete=False)
        #print("Launching %s" % temp.name)
        launch_text = '<launch>\n'
        if roslaunch_configuration.screen():
            launch_text += '  <param name="rocon/screen" value="true"/>\n'
        else:
            launch_text += '  <param name="rocon/screen" value="false"/>\n'
        if roslaunch_configuration.namespace:
            launch_text += '  <group ns="%s">\n' % roslaunch_configuration.namespace
        launch_text += '  <include file="%s">\n' % roslaunch_configuration.path
        for (arg_name, arg_value) in roslaunch_configuration.args:
            launch_text += '    <arg name="%s" value="%s"/>\n' % (arg_name, arg_value)
        launch_text += '  </include>\n'
        if roslaunch_configuration.namespace:
            launch_text += '  </group>\n'
        launch_text += '</launch>\n'
        #print launch_text
        ros_launch_file.write(launch_text)
        ros_launch_file.close()  # unlink it later
        return ros_launch_file

    def spawn_roslaunch_window(self, roslaunch_configuration, postexec_fn=None, env = {}):
        """
        :param roslaunch_configuration: required roslaunch info
        :type roslaunch_configuration: :class:`.RosLaunchConfiguration`
        :param func postexec_fn: run this after the subprocess finishes
        :param dict env: a additional customised environment to run ros launcher, {key : value}
        :returns: the subprocess and temp roslaunch file handles
        :rtype: (:class:`subprocess.Popen`, :class:`tempfile.NamedTemporaryFile`
        """
        if self.__class__ is Terminal:
            console.logerror("Do not use 'Terminal' directly, it is an abstract base class")
            sys.exit(1)
        if 'prepare_command' not in vars(self.__class__):
            console.logerror("The method _prepare_command must be implemented in children of rocon_launch.terminals.Terminal")
            sys.exit(1)
        meta_roslauncher = self._prepare_meta_roslauncher(roslaunch_configuration)
        cmd = self.prepare_command(roslaunch_configuration, meta_roslauncher.name)  # must be implemented in children
        # ROS_NAMESPACE is typically set since we often call this from inside a node
        # itself. Got to get rid of this otherwise it pushes things down
        roslaunch_env = os.environ.copy()
        if len(env) != 0:
            for key in env.keys():
                roslaunch_env[key] = env[key]
        try:
            roslaunch_env['ROS_MASTER_URI'] = roslaunch_env['ROS_MASTER_URI'].replace(str(urlparse(roslaunch_env['ROS_MASTER_URI']).port),str(roslaunch_configuration.port))
            del roslaunch_env['ROS_NAMESPACE']
        except KeyError:
            pass
        return (rocon_python_utils.system.Popen(cmd, postexec_fn=postexec_fn, env=roslaunch_env), meta_roslauncher)

##############################################################################
# Active
##############################################################################


[docs]class Active(Terminal): """ A pseudo representation of the currently open terminal. """ def __init__(self): """Dude""" super(Active, self).__init__(active)
[docs] def prepare_command(self, roslaunch_configuration, meta_roslauncher_filename): """ Prepare the custom command for a roslaunch window. :param roslaunch_configuration: required roslaunch info :type roslaunch_configuration: :class:`.RosLaunchConfiguration` :param str meta_roslauncher_filename: temporary roslauncher file """ cmd = ["roslaunch"] if roslaunch_configuration.options: cmd.append(roslaunch_configuration.options) cmd.append(meta_roslauncher_filename) return cmd ############################################################################## # Konsole ##############################################################################
[docs]class Konsole(Terminal): """ Responsible for handling of kde konsole terminals. """ def __init__(self): super(Konsole, self).__init__(konsole)
[docs] def prepare_command(self, roslaunch_configuration, meta_roslauncher_filename): """ Prepare the custom command for a roslaunch window. :param roslaunch_configuration: required roslaunch info :type roslaunch_configuration: :class:`.RosLaunchConfiguration` :param str meta_roslauncher_filename: temporary roslauncher file """ cmd = [self.name, '-p', 'tabtitle=%s' % roslaunch_configuration.title, '--nofork', '--hold', '-e', "/bin/bash", "-c", "roslaunch %s --disable-title %s" % (roslaunch_configuration.options, meta_roslauncher_filename)] return cmd ############################################################################## # Gnome Terminal ##############################################################################
[docs]class GnomeTerminal(Terminal): """ Responsible for handling of gnome-terminal terminals. """ def __init__(self): super(GnomeTerminal, self).__init__(gnome_terminal)
[docs] def prepare_command(self, roslaunch_configuration, meta_roslauncher_filename): """ Prepare the custom command for a roslaunch window. :param roslaunch_configuration: required roslaunch info :type roslaunch_configuration: :class:`.RosLaunchConfiguration` :param str meta_roslauncher_filename: temporary roslauncher file """ cmd = [self.name, '--title=%s' % roslaunch_configuration.title, '--disable-factory', "-e", "/bin/bash -c 'roslaunch %s --disable-title %s';/bin/bash" % (roslaunch_configuration.options, meta_roslauncher_filename) ] return cmd ############################################################################## # Factory ##############################################################################
supported_terminals = {active: Active, konsole: Konsole, gnome_terminal: GnomeTerminal, gnome_terminal_wrapper: GnomeTerminal }
[docs]def create_terminal(name=None): """ Creates a manager for the terminal with supporting methods and variables. If name is None, it will try to auto-detect the user's terminal. We're currently using ubuntu's x-terminal-emulator to choose the shell. :param str name: name of the terminal manager to create (None to auto-detect). :returns: one of the suported terminal classes :rtype: one of the children of :class:.`.Terminal` :raises :exc:`.UnsupportedTerminal` if the name is not in the supported terminals list. :raises :exc:`rocon_python_comms.NotFoundException` if the specified/auto-detected terminal is not found on the system. """ if name is not None and name not in supported_terminals.keys(): raise UnsupportedTerminal("%s is not a supported terminal type [%s]" % (name, supported_terminals.keys())) if name == konsole: if not rocon_python_utils.system.which('konsole'): msg = "cannot find 'konsole' (hint: try --gnome for gnome-terminal instead)" raise rocon_python_comms.NotFoundException(msg) elif name == gnome_terminal or name == gnome_terminal_wrapper: if not rocon_python_utils.system.which('konsole'): msg = "cannot find 'gnome' (hint: try --konsole for konsole instead)" raise rocon_python_comms.NotFoundException(msg) # elif name is active: # nothing to do elif name is None: # auto-detect if not rocon_python_utils.system.which('x-terminal-emulator'): msg = "tried to auto-detect, but cannot find 'x-terminal-emulator' (hint: try --gnome or --konsole instead)" raise rocon_python_comms.NotFoundException(msg) p = subprocess.Popen([rocon_python_utils.system.which('update-alternatives'), '--query', 'x-terminal-emulator'], stdout=subprocess.PIPE) for line in p.stdout: if line.startswith("Value:"): auto_detected_name = os.path.basename(line.split()[1]) break if auto_detected_name not in supported_terminals.keys(): msg = "you are %s, an esoteric and unsupported terminal" % (auto_detected_name) console.warning(msg.capitalize()) fallbacks = [konsole, gnome_terminal] for terminal_name in fallbacks: if rocon_python_utils.system.which(terminal_name): name = terminal_name console.warning(" --> falling back to '%s'" % terminal_name) if name is None: raise UnsupportedTerminal(msg + " (hint: try --gnome or --konsole instead)[%s]" % supported_terminals.keys()) else: name = auto_detected_name return supported_terminals[name]()