launchman.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 # Revision $Id$
00035 
00036 PKG = 'launchman' # this package name
00037 NAME = 'launchman'
00038 
00039 import roslib; roslib.load_manifest(PKG) 
00040 
00041 import time
00042 import string
00043 from optparse import OptionParser
00044 
00045 from launchman.srv import *
00046 from launchman.msg import *
00047 import rospy 
00048 
00049 import roslib.names
00050 import roslib.network 
00051 
00052 from roslaunch.config import ROSLaunchConfig
00053 #from roslaunch.launch import ROSLaunchRunner
00054 import roslaunch.parent
00055 from roslaunch.pmon import pmon_shutdown as _pmon_shutdown
00056 from roslaunch.xmlloader import *
00057 
00058 from launchman import app
00059 
00060 class AppGroup:
00061   def __init__(self, manager, apprunner):
00062     self.apprunner = None
00063     self.manager = manager
00064 
00065 class AppRunner:
00066   def __init__(self, taskid, manager):
00067     self.taskid = taskid
00068     self.task = AppUpdate(None, None, None, None, None)
00069     self.app = None
00070     self.runner = None
00071     self.childGroups = []
00072     self.manager = manager
00073 
00074   def __del__(self):
00075     if self.manager: self.manager = None
00076     if self.childGroups: self.childGroups = None
00077 
00078   def launch(self):
00079     package, launch_file = string.split(self.app.launch_file, "/", 1)
00080     path = app.getPackagePath(package)
00081     print "pkgpath", path
00082     print "launchfile", launch_file
00083 
00084     fn = os.path.join(path, launch_file)
00085     if not os.path.exists(fn):
00086       return False
00087 
00088     self.runner = roslaunch.parent.ROSLaunchParent(rospy.get_param("/run_id"), [fn], is_core=False, process_listeners=(self, ))
00089     self.runner.start()
00090 
00091     return True
00092 
00093   def stop(self):
00094     if not self.runner: 
00095       print "no runner", self
00096       return
00097     self.runner.shutdown()
00098     self.runner = None
00099 
00100   def process_died(self, process_name, exit_code):
00101     rospy.logerr( "process_died, but we're ignoring this: %s, %s" % ( process_name, exit_code))
00102     return
00103 
00104     rospy.logdebug( "process_died: %s, %s" % ( process_name, exit_code))
00105     if self.runner:  rospy.logdebug(self.runner.pm.procs)
00106 
00107     if not self.runner: return
00108 
00109     if len(self.runner.pm.procs) == 1:
00110       #make sure to clean up the runner so that we don't strand roslaunch processes
00111       try:
00112         self.runner.shutdown()
00113       except Exception as e:
00114         pass
00115 
00116       self.runner = None
00117       print "ALL DONE!"
00118       self.manager._stopTask(self)
00119     else:
00120       if self.runner.pm.procs:
00121         self.task.status = "error"
00122         self.manager._send_status()
00123 
00124         print "too many processes left:", len(self.runner.pm.procs)
00125         for proc in self.runner.pm.procs:
00126           proc.stop()
00127         
00128       #make sure to clean up the runner so that we don't strand roslaunch processes
00129       try:
00130         self.runner.shutdown()
00131       except Exception as e:
00132         pass
00133 
00134       self.runner = None
00135       self.manager._stopTask(self)
00136       
00137       
00138 
00139   def __repr__(self):
00140     return "<AppRunner %s: %s %s %s>" % (self.taskid, self.app.provides, self.app.taskid, len(self.childGroups))
00141 
00142 
00143 
00144 class TaskManager:
00145   def __init__(self, kill_on_conflict=False):
00146     self.app_update = rospy.Publisher("app_update", AppUpdate, self)
00147     self.app_status = rospy.Publisher("app_status", AppStatus, self)
00148     self._taskGroups = {}
00149     self._apps = {}
00150     self.kill_on_conflict = kill_on_conflict
00151 
00152   def _send_status(self):
00153     apps = []
00154     for taskid, runner in self._apps.items():
00155       apps.append(Application(taskid, runner.app.name, runner.task.status, runner.app.icon, str(runner.app.provides), str(runner.app.depends)))
00156     self.app_status.publish(apps)
00157 
00158   # Look for running tasks that provide any of the capabilities included 
00159   # in deps.  Returns a tuple: (uncovered, pgroup).   uncovered is the subset
00160   # of deps not provided by any running task.  pgroup 
00161   # is a list of AppRunner objects associated with all tasks that provide 
00162   # any of deps.
00163   def _find_providers(self, deps):
00164     if not deps:
00165       return (), []
00166     d = set(deps)
00167     pgroup = []
00168     for k in self._taskGroups:
00169       isect = set(k) & d
00170       if isect:
00171         pgroup.append(self._taskGroups[k])
00172         d = d - isect
00173     return tuple(d), pgroup
00174 
00175   def list_tasks(self, req):
00176     s = []
00177     for a in self._taskGroups:
00178       s.append(self._taskGroups[a].app.name)    
00179     return ListTasksResponse(s)
00180 
00181   def start_task(self, req):
00182     a = app.App(req.taskid)
00183     a.load()
00184 
00185     # Check for conflicts between this task's provides and those of 
00186     # running tasks.
00187     pgroup = None
00188     uncovered, pgroup = self._find_providers(a.provides)
00189     for runner in pgroup:
00190       if runner.task.taskid == req.taskid:
00191         print "already running"
00192         self._send_status()
00193         return StartTaskResponse("This app is already running.")
00194 
00195       if self.kill_on_conflict:
00196         self._stopTask(runner)
00197       else:
00198         return StartTaskResponse("The %s app conflicts with the %s app; please stop the %s app"%(a.name, runner.app.name, runner.app.name))
00199 
00200     # Check for satisfaction of this task's dependencies.
00201     if a.depends:
00202       uncovered, pgroup = self._find_providers(a.depends)
00203       if uncovered:
00204         print "no parent task group %s running" % str(uncovered)
00205         self._send_status()
00206         return StartTaskResponse("No app that provides %s is running." % str(uncovered))
00207 
00208     runner = AppRunner(req.taskid, self)
00209     runner.app = a
00210     self._apps[req.taskid] = runner
00211 
00212     if a.provides:
00213       self._taskGroups[a.provides] = runner
00214 
00215     if pgroup:
00216       for p in pgroup:
00217         p.childGroups.append(runner)
00218 
00219     #print "startTask [%s, %s, %s]" % (req.taskid, a.name, req.username)
00220     runner.task.taskid = req.taskid
00221     runner.task.username = req.username
00222     runner.task.started = rospy.get_rostime()
00223     runner.task.status = "starting"
00224 
00225 #    self.app_update.publish(runner.task)
00226     self._send_status()
00227 
00228     try:
00229       runner.launch()
00230     except Exception as e:
00231       import traceback
00232       traceback.print_exc()
00233       runner.task.status = "error"
00234       self._send_status()
00235 
00236       self.runner = None
00237       self._stopTask(runner)
00238       return StartTaskResponse("Exception while trying to launch the %s app: \"%s\""%(req.taskid, e))
00239       
00240     runner.task.status = "running"
00241 #    self.app_update.publish(runner.task)
00242     self._send_status()
00243 
00244     return StartTaskResponse("done")
00245 
00246   def showstate(self):
00247     print "_" * 40
00248     for provides, runner in self._taskGroups.items():
00249       print provides, runner.childGroups
00250 
00251   def _stopTask(self, runner):
00252     runner.task.status = "stopping"
00253 #    self.app_update.publish(runner.task)
00254     self._send_status()
00255 
00256     for cgroup in runner.childGroups[:]:
00257       self._stopTask(cgroup)
00258 
00259     runner.stop()
00260 
00261     runner.task.status = "stopped"
00262     self._send_status()
00263 
00264     if runner.app.depends:
00265       # Check for tasks that we need to shut down as a consequence 
00266       # of stopping this task.
00267       uncovered, pgroup = self._find_providers(runner.app.depends)
00268       for p in pgroup:
00269         if runner in p.childGroups:
00270           p.childGroups.remove(runner)
00271   
00272     if runner.app.provides:
00273       print "removing", runner.app.provides
00274       del self._taskGroups[runner.app.provides]
00275 
00276     if runner.taskid in self._apps:
00277       del self._apps[runner.taskid]
00278       print "removing", runner.taskid
00279 
00280 
00281   def stop_task(self, req):
00282     if req.taskid not in self._apps:
00283       return StopTaskResponse("no such task: %s" % req.taskid)
00284       
00285     runner = self._apps[req.taskid]
00286 
00287     self._stopTask(runner)
00288 
00289     self._send_status()
00290 
00291     return StopTaskResponse("done")
00292 
00293   def status_update(self, req):
00294     self._send_status()
00295 #    for provides, runner in self._taskGroups.items():
00296 #      self.app_update.publish(runner.task)
00297     return StatusUpdateResponse("done")
00298 
00299 
00300   def peer_subscribe(self, topic_name, topic_publish, peer_publish):
00301     pass
00302 
00303   def peer_unsubscribe(self, topic_name, numPeers):
00304     pass
00305 
00306 
00307 def server(kill_on_conflict):
00308     rospy.init_node(NAME)
00309 
00310     tm = TaskManager(kill_on_conflict)
00311     s1 = rospy.Service('start_task', StartTask, tm.start_task)
00312     s2 = rospy.Service('stop_task', StopTask, tm.stop_task)
00313     s3 = rospy.Service('status_update', StatusUpdate, tm.status_update)
00314     s4 = rospy.Service('list_tasks', ListTasks, tm.list_tasks)
00315 
00316     # spin() keeps Python from exiting until node is shutdown
00317     rospy.spin()
00318 
00319 if __name__ == "__main__":
00320     parser = OptionParser(usage="usage: %prog [options]", prog='launchman.py')
00321     parser.add_option("-k", "--kill-on-conflict", dest="kill", 
00322                       default=False, action="store_true", 
00323                       help="automatically kill conflicting tasks when starting a new task")
00324     options, args = parser.parse_args()
00325     #initialize roslaunch in main thread
00326     roslaunch.pmon._init_signal_handlers()
00327     server(options.kill)


launchman
Author(s): Scott Hassan
autogenerated on Wed Apr 23 2014 10:35:46