Source code for capabilities.launch_manager

# Software License Agreement (BSD License)
#
# Copyright (c) 2013, Open Source Robotics Foundation, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above
#    copyright notice, this list of conditions and the following
#    disclaimer in the documentation and/or other materials provided
#    with the distribution.
#  * Neither the name of Open Source Robotics Foundation, Inc. nor
#    the names of its contributors may be used to endorse or promote
#    products derived from this software without specific prior
#    written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

# Author: William Woodall <william@osrfoundation.org>

"""This module manages the launching of capabilities"""

from __future__ import print_function

import copy
import os
import subprocess
import sys
import threading

import rospy

from capabilities.msg import CapabilityEvent


[docs]def which(program): """Custom versions of the ``which`` built-in shell command Searches the pathes in the ``PATH`` environment variable for a given executable name. It returns the full path to the first instance of the executable found or None if it was not found. :param program: name of the executable to find :type program: str :returns: Full path to the first instance of the executable, or None :rtype: str or None """ def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ.get('PATH', os.defpath).split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None
_this_dir = os.path.dirname(__file__) _placeholder_script = os.path.join(_this_dir, 'placeholder_script') _nodelet_manager_launch_file = os.path.join(_this_dir, 'capability_server_nodelet_manager.launch') _special_nodelet_manager_capability = '!!nodelet_manager'
[docs]class LaunchManager(object): """Manages multiple launch files which implement capabilities""" __roslaunch_exec = which('roslaunch') __python_exec = which('python') def __init__(self, quiet=False, screen=False, nodelet_manager_name=None): self.__running_launch_files_lock = threading.Lock() with self.__running_launch_files_lock: self.__running_launch_files = {} self.__outputs = [sys.stdout] self.__event_publisher = rospy.Publisher("~events", CapabilityEvent, queue_size=1000) self.__event_subscriber = rospy.Subscriber( "~events", CapabilityEvent, self.handle_capability_events) self.stopping = False self.__quiet = quiet self.__screen = screen name = rospy.get_namespace() name += "" if name.startswith('/') else "/" name += nodelet_manager_name if nodelet_manager_name else rospy.get_name().split('/')[-1] + '_nodelet_manager' self.__nodelet_manager_name = name self.__start_nodelet_manager() @property def nodelet_manager_name(self): return self.__nodelet_manager_name
[docs] def stop(self): """Stops the launch manager, also stopping any running launch files""" if self.stopping: return # pragma: no cover with self.__running_launch_files_lock: # Incase the other thread tried the lock before this thread updated # the self.stopping variable, check it again. if self.stopping: return # pragma: no cover self.stopping = True for pid in self.__running_launch_files: # pragma: no cover try: self.__stop_by_pid(pid) except RuntimeError as exc: if "launch file with PID" not in "{0}".format(exc): raise # Re-raise
def __stop_by_pid(self, pid): if pid not in self.__running_launch_files: raise RuntimeError("No running launch file with PID of '{0}'".format(pid)) proc, thread, _, _ = self.__running_launch_files[pid] if proc.poll() is None: proc.terminate() proc.wait() thread.join()
[docs] def stop_capability_provider(self, pid): """Stops the launch file for a capability provider, by pid :param pid: process ID of the launch file process that be stopped. :type pid: int """ with self.__running_launch_files_lock: self.__stop_by_pid(pid)
[docs] def handle_capability_events(self, msg): """Callback for events recieved on the events topic Only handles TERMINDATED events, all other events are discarded. :param msg: ROS message recieved on the events topic :type msg: :py:class:`capabilities.msgs.CapabilityEvent` """ if msg.type == msg.SERVER_READY: return if msg.type != msg.TERMINATED: return with self.__running_launch_files_lock: if msg.pid in self.__running_launch_files: del self.__running_launch_files[msg.pid]
[docs] def run_capability_provider(self, provider, provider_path): """Runs a given capability provider by launching its launch file :param provider: provider that should be launched :type provider: :py:class:`capabilities.specs.provider.CapabilityProvider` :param provider_path: path which the provider spec file is located at :type provider_path: str """ if os.path.isfile(provider_path): provider_path = os.path.dirname(provider_path) if provider.launch_file is None: launch_file = None rospy.loginfo("Provider '{0}' does not have a launch file, running a placeholder." .format(provider.name)) else: launch_file = os.path.join(provider_path, provider.launch_file) self.run_launch_file(launch_file, provider)
def run_launch_file(self, launch_file, provider, manager=False): with self.__running_launch_files_lock: if launch_file is not None and launch_file in [x[3] for x in self.__running_launch_files.values()]: raise RuntimeError("Launch file at '{0}' is already running." .format(launch_file)) if launch_file is None: cmd = [self.__python_exec, _placeholder_script] else: if self.__screen: cmd = [self.__roslaunch_exec, '--screen', launch_file] else: cmd = [self.__roslaunch_exec, launch_file] if manager: cmd.append("capability_server_nodelet_manager_name:=" + self.__nodelet_manager_name.split('/')[-1]) else: cmd.append("capability_server_nodelet_manager_name:=" + self.__nodelet_manager_name) if self.__quiet: env = copy.deepcopy(os.environ) env['PYTHONUNBUFFERED'] = 'x' proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env) else: proc = subprocess.Popen(cmd) thread = self.__start_communication_thread(proc) msg = CapabilityEvent() msg.header.stamp = rospy.Time.now() msg.capability = provider.implements msg.provider = provider.name msg.pid = proc.pid msg.type = msg.LAUNCHED self.__running_launch_files[proc.pid] = [ proc, thread, provider, launch_file ] self.__event_publisher.publish(msg) thread.start() def __start_nodelet_manager(self): class MockProvider: implements = _special_nodelet_manager_capability name = rospy.get_name().lstrip('/') provider = MockProvider() launch_file = _nodelet_manager_launch_file self.run_launch_file(launch_file, provider, manager=True) def __start_communication_thread(self, proc): return threading.Thread(target=self.__monitor_process, args=(proc,)) def __monitor_process(self, proc): try: with self.__running_launch_files_lock: if proc.pid not in self.__running_launch_files: raise RuntimeError("Unknown process id: " + str(proc.pid)) provider = self.__running_launch_files[proc.pid][2] if proc.stdout is not None: while proc.returncode is None: try: for line in iter(proc.stdout.readline, ''): for output in self.__outputs: output.write(line) except KeyboardInterrupt: # pragma: no cover pass proc.poll() else: proc.wait() msg = CapabilityEvent() msg.header.stamp = rospy.Time.now() msg.capability = provider.implements msg.provider = provider.name msg.type = msg.TERMINATED msg.pid = proc.pid self.__event_publisher.publish(msg) except Exception as exc: rospy.logerr('{0}: {1}'.format(exc.__class__.__name__, str(exc))) raise
assert LaunchManager._LaunchManager__roslaunch_exec is not None, "'roslaunch' executable not found"