00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 PKG = 'launchman'
00037 NAME = 'launchman'
00038
00039 import roslib; roslib.load_manifest(PKG)
00040
00041 import time
00042 import string
00043 from optparse import OptionParser
00044
00045 from launchman.srv import *
00046 from launchman.msg import *
00047 import rospy
00048
00049 import roslib.names
00050 import roslib.network
00051
00052 from roslaunch.config import ROSLaunchConfig
00053
00054 import roslaunch.parent
00055 from roslaunch.pmon import pmon_shutdown as _pmon_shutdown
00056 from roslaunch.xmlloader import *
00057
00058 from launchman import app
00059
00060 class AppGroup:
00061 def __init__(self, manager, apprunner):
00062 self.apprunner = None
00063 self.manager = manager
00064
00065 class AppRunner:
00066 def __init__(self, taskid, manager):
00067 self.taskid = taskid
00068 self.task = AppUpdate(None, None, None, None, None)
00069 self.app = None
00070 self.runner = None
00071 self.childGroups = []
00072 self.manager = manager
00073
00074 def __del__(self):
00075 if self.manager: self.manager = None
00076 if self.childGroups: self.childGroups = None
00077
00078 def launch(self):
00079 package, launch_file = string.split(self.app.launch_file, "/", 1)
00080 path = app.getPackagePath(package)
00081 print "pkgpath", path
00082 print "launchfile", launch_file
00083
00084 fn = os.path.join(path, launch_file)
00085 if not os.path.exists(fn):
00086 return False
00087
00088 self.runner = roslaunch.parent.ROSLaunchParent(rospy.get_param("/run_id"), [fn], is_core=False, process_listeners=(self, ))
00089 self.runner.start()
00090
00091 return True
00092
00093 def stop(self):
00094 if not self.runner:
00095 print "no runner", self
00096 return
00097 self.runner.shutdown()
00098 self.runner = None
00099
00100 def process_died(self, process_name, exit_code):
00101 rospy.logerr( "process_died, but we're ignoring this: %s, %s" % ( process_name, exit_code))
00102 return
00103
00104 rospy.logdebug( "process_died: %s, %s" % ( process_name, exit_code))
00105 if self.runner: rospy.logdebug(self.runner.pm.procs)
00106
00107 if not self.runner: return
00108
00109 if len(self.runner.pm.procs) == 1:
00110
00111 try:
00112 self.runner.shutdown()
00113 except Exception as e:
00114 pass
00115
00116 self.runner = None
00117 print "ALL DONE!"
00118 self.manager._stopTask(self)
00119 else:
00120 if self.runner.pm.procs:
00121 self.task.status = "error"
00122 self.manager._send_status()
00123
00124 print "too many processes left:", len(self.runner.pm.procs)
00125 for proc in self.runner.pm.procs:
00126 proc.stop()
00127
00128
00129 try:
00130 self.runner.shutdown()
00131 except Exception as e:
00132 pass
00133
00134 self.runner = None
00135 self.manager._stopTask(self)
00136
00137
00138
00139 def __repr__(self):
00140 return "<AppRunner %s: %s %s %s>" % (self.taskid, self.app.provides, self.app.taskid, len(self.childGroups))
00141
00142
00143
00144 class TaskManager:
00145 def __init__(self, kill_on_conflict=False):
00146 self.app_update = rospy.Publisher("app_update", AppUpdate, self)
00147 self.app_status = rospy.Publisher("app_status", AppStatus, self)
00148 self._taskGroups = {}
00149 self._apps = {}
00150 self.kill_on_conflict = kill_on_conflict
00151
00152 def _send_status(self):
00153 apps = []
00154 for taskid, runner in self._apps.items():
00155 apps.append(Application(taskid, runner.app.name, runner.task.status, runner.app.icon, str(runner.app.provides), str(runner.app.depends)))
00156 self.app_status.publish(apps)
00157
00158
00159
00160
00161
00162
00163 def _find_providers(self, deps):
00164 if not deps:
00165 return (), []
00166 d = set(deps)
00167 pgroup = []
00168 for k in self._taskGroups:
00169 isect = set(k) & d
00170 if isect:
00171 pgroup.append(self._taskGroups[k])
00172 d = d - isect
00173 return tuple(d), pgroup
00174
00175 def list_tasks(self, req):
00176 s = []
00177 for a in self._taskGroups:
00178 s.append(self._taskGroups[a].app.name)
00179 return ListTasksResponse(s)
00180
00181 def start_task(self, req):
00182 a = app.App(req.taskid)
00183 a.load()
00184
00185
00186
00187 pgroup = None
00188 uncovered, pgroup = self._find_providers(a.provides)
00189 for runner in pgroup:
00190 if runner.task.taskid == req.taskid:
00191 print "already running"
00192 self._send_status()
00193 return StartTaskResponse("This app is already running.")
00194
00195 if self.kill_on_conflict:
00196 self._stopTask(runner)
00197 else:
00198 return StartTaskResponse("The %s app conflicts with the %s app; please stop the %s app"%(a.name, runner.app.name, runner.app.name))
00199
00200
00201 if a.depends:
00202 uncovered, pgroup = self._find_providers(a.depends)
00203 if uncovered:
00204 print "no parent task group %s running" % str(uncovered)
00205 self._send_status()
00206 return StartTaskResponse("No app that provides %s is running." % str(uncovered))
00207
00208 runner = AppRunner(req.taskid, self)
00209 runner.app = a
00210 self._apps[req.taskid] = runner
00211
00212 if a.provides:
00213 self._taskGroups[a.provides] = runner
00214
00215 if pgroup:
00216 for p in pgroup:
00217 p.childGroups.append(runner)
00218
00219
00220 runner.task.taskid = req.taskid
00221 runner.task.username = req.username
00222 runner.task.started = rospy.get_rostime()
00223 runner.task.status = "starting"
00224
00225
00226 self._send_status()
00227
00228 try:
00229 runner.launch()
00230 except Exception as e:
00231 import traceback
00232 traceback.print_exc()
00233 runner.task.status = "error"
00234 self._send_status()
00235
00236 self.runner = None
00237 self._stopTask(runner)
00238 return StartTaskResponse("Exception while trying to launch the %s app: \"%s\""%(req.taskid, e))
00239
00240 runner.task.status = "running"
00241
00242 self._send_status()
00243
00244 return StartTaskResponse("done")
00245
00246 def showstate(self):
00247 print "_" * 40
00248 for provides, runner in self._taskGroups.items():
00249 print provides, runner.childGroups
00250
00251 def _stopTask(self, runner):
00252 runner.task.status = "stopping"
00253
00254 self._send_status()
00255
00256 for cgroup in runner.childGroups[:]:
00257 self._stopTask(cgroup)
00258
00259 runner.stop()
00260
00261 runner.task.status = "stopped"
00262 self._send_status()
00263
00264 if runner.app.depends:
00265
00266
00267 uncovered, pgroup = self._find_providers(runner.app.depends)
00268 for p in pgroup:
00269 if runner in p.childGroups:
00270 p.childGroups.remove(runner)
00271
00272 if runner.app.provides:
00273 print "removing", runner.app.provides
00274 del self._taskGroups[runner.app.provides]
00275
00276 if runner.taskid in self._apps:
00277 del self._apps[runner.taskid]
00278 print "removing", runner.taskid
00279
00280
00281 def stop_task(self, req):
00282 if req.taskid not in self._apps:
00283 return StopTaskResponse("no such task: %s" % req.taskid)
00284
00285 runner = self._apps[req.taskid]
00286
00287 self._stopTask(runner)
00288
00289 self._send_status()
00290
00291 return StopTaskResponse("done")
00292
00293 def status_update(self, req):
00294 self._send_status()
00295
00296
00297 return StatusUpdateResponse("done")
00298
00299
00300 def peer_subscribe(self, topic_name, topic_publish, peer_publish):
00301 pass
00302
00303 def peer_unsubscribe(self, topic_name, numPeers):
00304 pass
00305
00306
00307 def server(kill_on_conflict):
00308 rospy.init_node(NAME)
00309
00310 tm = TaskManager(kill_on_conflict)
00311 s1 = rospy.Service('start_task', StartTask, tm.start_task)
00312 s2 = rospy.Service('stop_task', StopTask, tm.stop_task)
00313 s3 = rospy.Service('status_update', StatusUpdate, tm.status_update)
00314 s4 = rospy.Service('list_tasks', ListTasks, tm.list_tasks)
00315
00316
00317 rospy.spin()
00318
00319 if __name__ == "__main__":
00320 parser = OptionParser(usage="usage: %prog [options]", prog='launchman.py')
00321 parser.add_option("-k", "--kill-on-conflict", dest="kill",
00322 default=False, action="store_true",
00323 help="automatically kill conflicting tasks when starting a new task")
00324 options, args = parser.parse_args()
00325
00326 roslaunch.pmon._init_signal_handlers()
00327 server(options.kill)