$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 PKG = 'launchman' # this package name 00037 NAME = 'launchman' 00038 00039 import roslib; roslib.load_manifest(PKG) 00040 00041 import time 00042 import string 00043 from optparse import OptionParser 00044 00045 from launchman.srv import * 00046 from launchman.msg import * 00047 import rospy 00048 00049 import roslib.names 00050 import roslib.network 00051 00052 from roslaunch.config import ROSLaunchConfig 00053 #from roslaunch.launch import ROSLaunchRunner 00054 import roslaunch.parent 00055 from roslaunch.pmon import pmon_shutdown as _pmon_shutdown 00056 from roslaunch.xmlloader import * 00057 00058 from launchman import app 00059 00060 class AppGroup: 00061 def __init__(self, manager, apprunner): 00062 self.apprunner = None 00063 self.manager = manager 00064 00065 class AppRunner: 00066 def __init__(self, taskid, manager): 00067 self.taskid = taskid 00068 self.task = AppUpdate(None, None, None, None, None) 00069 self.app = None 00070 self.runner = None 00071 self.childGroups = [] 00072 self.manager = manager 00073 00074 def __del__(self): 00075 if self.manager: self.manager = None 00076 if self.childGroups: self.childGroups = None 00077 00078 def launch(self): 00079 package, launch_file = string.split(self.app.launch_file, "/", 1) 00080 path = app.getPackagePath(package) 00081 print "pkgpath", path 00082 print "launchfile", launch_file 00083 00084 fn = os.path.join(path, launch_file) 00085 if not os.path.exists(fn): 00086 return False 00087 00088 self.runner = roslaunch.parent.ROSLaunchParent(rospy.get_param("/run_id"), [fn], is_core=False, process_listeners=(self, )) 00089 self.runner.start() 00090 00091 return True 00092 00093 def stop(self): 00094 if not self.runner: 00095 print "no runner", self 00096 return 00097 self.runner.shutdown() 00098 self.runner = None 00099 00100 def process_died(self, process_name, exit_code): 00101 rospy.logerr( "process_died, but we're ignoring this: %s, %s" % ( process_name, exit_code)) 00102 return 00103 00104 rospy.logdebug( "process_died: %s, %s" % ( process_name, exit_code)) 00105 if self.runner: rospy.logdebug(self.runner.pm.procs) 00106 00107 if not self.runner: return 00108 00109 if len(self.runner.pm.procs) == 1: 00110 #make sure to clean up the runner so that we don't strand roslaunch processes 00111 try: 00112 self.runner.shutdown() 00113 except Exception as e: 00114 pass 00115 00116 self.runner = None 00117 print "ALL DONE!" 00118 self.manager._stopTask(self) 00119 else: 00120 if self.runner.pm.procs: 00121 self.task.status = "error" 00122 self.manager._send_status() 00123 00124 print "too many processes left:", len(self.runner.pm.procs) 00125 for proc in self.runner.pm.procs: 00126 proc.stop() 00127 00128 #make sure to clean up the runner so that we don't strand roslaunch processes 00129 try: 00130 self.runner.shutdown() 00131 except Exception as e: 00132 pass 00133 00134 self.runner = None 00135 self.manager._stopTask(self) 00136 00137 00138 00139 def __repr__(self): 00140 return "<AppRunner %s: %s %s %s>" % (self.taskid, self.app.provides, self.app.taskid, len(self.childGroups)) 00141 00142 00143 00144 class TaskManager: 00145 def __init__(self, kill_on_conflict=False): 00146 self.app_update = rospy.Publisher("app_update", AppUpdate, self) 00147 self.app_status = rospy.Publisher("app_status", AppStatus, self) 00148 self._taskGroups = {} 00149 self._apps = {} 00150 self.kill_on_conflict = kill_on_conflict 00151 00152 def _send_status(self): 00153 apps = [] 00154 for taskid, runner in self._apps.items(): 00155 apps.append(Application(taskid, runner.app.name, runner.task.status, runner.app.icon, str(runner.app.provides), str(runner.app.depends))) 00156 self.app_status.publish(apps) 00157 00158 # Look for running tasks that provide any of the capabilities included 00159 # in deps. Returns a tuple: (uncovered, pgroup). uncovered is the subset 00160 # of deps not provided by any running task. pgroup 00161 # is a list of AppRunner objects associated with all tasks that provide 00162 # any of deps. 00163 def _find_providers(self, deps): 00164 if not deps: 00165 return (), [] 00166 d = set(deps) 00167 pgroup = [] 00168 for k in self._taskGroups: 00169 isect = set(k) & d 00170 if isect: 00171 pgroup.append(self._taskGroups[k]) 00172 d = d - isect 00173 return tuple(d), pgroup 00174 00175 def list_tasks(self, req): 00176 s = [] 00177 for a in self._taskGroups: 00178 s.append(self._taskGroups[a].app.name) 00179 return ListTasksResponse(s) 00180 00181 def start_task(self, req): 00182 a = app.App(req.taskid) 00183 a.load() 00184 00185 # Check for conflicts between this task's provides and those of 00186 # running tasks. 00187 pgroup = None 00188 uncovered, pgroup = self._find_providers(a.provides) 00189 for runner in pgroup: 00190 if runner.task.taskid == req.taskid: 00191 print "already running" 00192 self._send_status() 00193 return StartTaskResponse("This app is already running.") 00194 00195 if self.kill_on_conflict: 00196 self._stopTask(runner) 00197 else: 00198 return StartTaskResponse("The %s app conflicts with the %s app; please stop the %s app"%(a.name, runner.app.name, runner.app.name)) 00199 00200 # Check for satisfaction of this task's dependencies. 00201 if a.depends: 00202 uncovered, pgroup = self._find_providers(a.depends) 00203 if uncovered: 00204 print "no parent task group %s running" % str(uncovered) 00205 self._send_status() 00206 return StartTaskResponse("No app that provides %s is running." % str(uncovered)) 00207 00208 runner = AppRunner(req.taskid, self) 00209 runner.app = a 00210 self._apps[req.taskid] = runner 00211 00212 if a.provides: 00213 self._taskGroups[a.provides] = runner 00214 00215 if pgroup: 00216 for p in pgroup: 00217 p.childGroups.append(runner) 00218 00219 #print "startTask [%s, %s, %s]" % (req.taskid, a.name, req.username) 00220 runner.task.taskid = req.taskid 00221 runner.task.username = req.username 00222 runner.task.started = rospy.get_rostime() 00223 runner.task.status = "starting" 00224 00225 # self.app_update.publish(runner.task) 00226 self._send_status() 00227 00228 try: 00229 runner.launch() 00230 except Exception as e: 00231 import traceback 00232 traceback.print_exc() 00233 runner.task.status = "error" 00234 self._send_status() 00235 00236 self.runner = None 00237 self._stopTask(runner) 00238 return StartTaskResponse("Exception while trying to launch the %s app: \"%s\""%(req.taskid, e)) 00239 00240 runner.task.status = "running" 00241 # self.app_update.publish(runner.task) 00242 self._send_status() 00243 00244 return StartTaskResponse("done") 00245 00246 def showstate(self): 00247 print "_" * 40 00248 for provides, runner in self._taskGroups.items(): 00249 print provides, runner.childGroups 00250 00251 def _stopTask(self, runner): 00252 runner.task.status = "stopping" 00253 # self.app_update.publish(runner.task) 00254 self._send_status() 00255 00256 for cgroup in runner.childGroups[:]: 00257 self._stopTask(cgroup) 00258 00259 runner.stop() 00260 00261 runner.task.status = "stopped" 00262 self._send_status() 00263 00264 if runner.app.depends: 00265 # Check for tasks that we need to shut down as a consequence 00266 # of stopping this task. 00267 uncovered, pgroup = self._find_providers(runner.app.depends) 00268 for p in pgroup: 00269 if runner in p.childGroups: 00270 p.childGroups.remove(runner) 00271 00272 if runner.app.provides: 00273 print "removing", runner.app.provides 00274 del self._taskGroups[runner.app.provides] 00275 00276 if runner.taskid in self._apps: 00277 del self._apps[runner.taskid] 00278 print "removing", runner.taskid 00279 00280 00281 def stop_task(self, req): 00282 if req.taskid not in self._apps: 00283 return StopTaskResponse("no such task: %s" % req.taskid) 00284 00285 runner = self._apps[req.taskid] 00286 00287 self._stopTask(runner) 00288 00289 self._send_status() 00290 00291 return StopTaskResponse("done") 00292 00293 def status_update(self, req): 00294 self._send_status() 00295 # for provides, runner in self._taskGroups.items(): 00296 # self.app_update.publish(runner.task) 00297 return StatusUpdateResponse("done") 00298 00299 00300 def peer_subscribe(self, topic_name, topic_publish, peer_publish): 00301 pass 00302 00303 def peer_unsubscribe(self, topic_name, numPeers): 00304 pass 00305 00306 00307 def server(kill_on_conflict): 00308 rospy.init_node(NAME) 00309 00310 tm = TaskManager(kill_on_conflict) 00311 s1 = rospy.Service('start_task', StartTask, tm.start_task) 00312 s2 = rospy.Service('stop_task', StopTask, tm.stop_task) 00313 s3 = rospy.Service('status_update', StatusUpdate, tm.status_update) 00314 s4 = rospy.Service('list_tasks', ListTasks, tm.list_tasks) 00315 00316 # spin() keeps Python from exiting until node is shutdown 00317 rospy.spin() 00318 00319 if __name__ == "__main__": 00320 parser = OptionParser(usage="usage: %prog [options]", prog='launchman.py') 00321 parser.add_option("-k", "--kill-on-conflict", dest="kill", 00322 default=False, action="store_true", 00323 help="automatically kill conflicting tasks when starting a new task") 00324 options, args = parser.parse_args() 00325 #initialize roslaunch in main thread 00326 roslaunch.pmon._init_signal_handlers() 00327 server(options.kill)