00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 from __future__ import with_statement
00039
00040 import roslib; roslib.load_manifest('application_manager')
00041 import rospy
00042 import os
00043 import threading
00044 import actionlib
00045 from actionlib_msgs.msg import GoalStatus
00046 import yaml
00047 from application_manager import roslauncher
00048 from application_msgs.msg import *
00049
00050
00051
00052
00053 class Application():
00054 def __init__(self):
00055 self.state = 'Stopped'
00056 self.client_goals = []
00057
00058
00059 def configure(self, conf):
00060 self.lock = threading.Lock()
00061
00062
00063 required_keys = ['package', 'name', 'description', 'launch_file', 'resources', 'dependencies']
00064 for k in required_keys:
00065 if not k in conf:
00066 rospy.logerr('Application in package %s does not specify option "%s"'%(conf['package'], k))
00067 return False
00068
00069 self.resources = conf['resources']
00070 self.depends = conf['dependencies']
00071 self.depends_on = []
00072 self.conflicts = []
00073 self.name = '/'.join([conf['package'], conf['name']])
00074 self.description = conf['description']
00075 try:
00076 if conf['launch_file'][0] == '/':
00077 with open(os.path.join(conf['launch_file'])) as f:
00078 self.launch_file = f.read()
00079 else:
00080 with open(os.path.join(roslib.packages.get_pkg_dir(conf['package']), conf['launch_file'])) as f:
00081 self.launch_file = f.read()
00082 except:
00083 rospy.logerr('Application %s specified invalid package or launch file'%conf['name'])
00084 return False
00085
00086
00087 self.action_ns = None
00088 if 'action_ns' in conf and 'preempt_timeout' in conf:
00089 self.action_ns = conf['action_ns']
00090 self.preempt_timeout = rospy.Duration(conf['preempt_timeout'])
00091 self.launch_timeout = rospy.Duration(30.0)
00092 self.action_client = None
00093
00094
00095 self.daemonize = False
00096 if 'daemonize' in conf:
00097 self.daemonize = conf['daemonize']
00098
00099 return True
00100
00101
00102
00103 def start(self, wait):
00104 self.lock.acquire()
00105
00106
00107 if self.state == 'Running':
00108 rospy.loginfo('Applicaiton %s already running, not starting it again'%self.name)
00109 self.lock.release()
00110 return True
00111
00112
00113 self.state = 'Starting'
00114 self.launcher = roslauncher.ScriptRoslaunch(self.launch_file)
00115 self.launcher.start()
00116
00117
00118 self.application_starter = ApplicationStarter(self)
00119 self.application_starter.start()
00120
00121
00122 while not rospy.is_shutdown() and wait and self.state != 'Running':
00123 rospy.sleep(0.1)
00124 return True
00125
00126
00127
00128 def stop(self, wait):
00129 self.lock.acquire()
00130
00131
00132 if self.state != 'Running':
00133 rospy.loginfo("Applicaiton %s is not running, so don't need to stop it"%self.name)
00134 self.lock.release()
00135 return True
00136
00137
00138 for g in self.client_goals:
00139 g.set_canceled()
00140 self.client_goals = []
00141
00142
00143 self.state = 'Preempting'
00144 self.application_stopper = ApplicationStopper(self)
00145 self.application_stopper.start()
00146
00147
00148 while not rospy.is_shutdown() and wait and self.state != 'Stopped':
00149 rospy.sleep(0.1)
00150 return True
00151
00152
00153 def done_cb(self, status, result):
00154 with self.lock:
00155 if self.state == 'Running':
00156 rospy.loginfo("Application %s stopped itself. Taking down launchfile"%self.name)
00157 self.stop(wait=False)
00158
00159
00160 class ApplicationStarter(threading.Thread):
00161 def __init__(self, application):
00162 threading.Thread.__init__(self)
00163 self.application = application
00164
00165
00166 def run(self):
00167 rospy.loginfo('Application starter thread created')
00168
00169
00170 if self.application.action_ns:
00171
00172 rospy.loginfo('Waiting for action client to come up in namespace %s for application %s'%(self.application.action_ns, self.application.name))
00173 self.application.action_client = actionlib.SimpleActionClient(self.application.action_ns, ApplicationAction)
00174 if not self.application.action_client.wait_for_server(self.application.launch_timeout):
00175 rospy.logerr('Action client server for application %s did not connect in time.'%self.application.name)
00176 self.application.launcher.shutdown()
00177 self.application.launcher = None
00178 self.application.state = 'Stopped'
00179 self.application.lock.release()
00180 return False
00181 self.application.action_client.send_goal(ApplicationGoal(), done_cb=self.application.done_cb)
00182 start_time = rospy.Time.now()
00183 while self.application.action_client.get_state() != GoalStatus.ACTIVE:
00184 rospy.sleep(0.1)
00185 if rospy.Time.now() > start_time + self.application.launch_timeout:
00186 rospy.logerr('Action client server for application %s did not accept start goal in time.'%self.application.name)
00187 self.application.launcher.shutdown()
00188 self.application.launcher = None
00189 self.application.state = 'Stopped'
00190 self.application.lock.release()
00191 return False
00192
00193 self.application.state = 'Running'
00194 rospy.loginfo('Started application %s'%self.application.name)
00195 self.application.lock.release()
00196 return True
00197
00198
00199
00200
00201 class ApplicationStopper(threading.Thread):
00202 def __init__(self, application):
00203 threading.Thread.__init__(self)
00204 self.application = application
00205
00206 def run(self):
00207 rospy.loginfo('Application stopper thread created')
00208
00209 if self.application.action_ns:
00210 rospy.loginfo("Sending preempt signal to application %s"%self.application.name)
00211 self.application.action_client.cancel_goal()
00212 self.application.action_client.wait_for_result(self.application.preempt_timeout)
00213 self.application.action_client = None
00214
00215
00216 self.application.launcher.shutdown()
00217 self.application.launcher = None
00218 self.application.state = 'Stopped'
00219 rospy.loginfo('Stopped application %s'%self.application.name)
00220 self.application.lock.release()
00221
00222
00223 class ApplicationQueueItem():
00224 def __init__(self, app, action, wait):
00225 self.app = app
00226 self.action = action
00227 self.wait = wait
00228 self.result = None
00229
00230
00231 class ApplicationQueue():
00232 def __init__(self):
00233 self.items = []
00234 self.items_lock = threading.Lock()
00235
00236 def pull_application(self):
00237 with self.items_lock:
00238 if len(self.items) == 0:
00239 return None
00240 else:
00241 item = self.items[0]
00242 del self.items[0]
00243 return item
00244
00245 def push_application(self, app, action, wait):
00246 with self.items_lock:
00247 self.items.append(ApplicationQueueItem(app, action, wait))
00248 return True
00249
00250 def stop_application(self, app, wait):
00251 return self.push_application(app, 'stop', wait)
00252
00253 def start_application(self, app, wait):
00254 return self.push_application(app, 'start', wait)
00255
00256
00257
00258
00259
00260
00261
00262 class ApplicationManager():
00263 def __init__(self, queue):
00264 self.lock = threading.RLock()
00265 self.queue = queue
00266 self.applications = self.get_available_applications()
00267
00268
00269 self.run_interface = actionlib.ActionServer('application_manager/run_application', RunApplicationAction, self.run_cb, self.cancel_cb)
00270 self.kill_interface = actionlib.SimpleActionServer('application_manager/kill_application', KillApplicationAction, self.kill_cb)
00271 self.stop_interface = actionlib.SimpleActionServer('application_manager/stop_application', StopApplicationAction, self.stop_cb)
00272 self.list_interface = actionlib.SimpleActionServer('application_manager/list_applications', ListApplicationsAction, self.list_cb)
00273
00274
00275 def get_available_applications(self):
00276 with self.lock:
00277
00278 path = roslib.packages.get_pkg_dir('application_manager')
00279 apps_yaml = yaml.load(open(path+'/applications.yaml'))
00280 available_applications = {}
00281 for app in apps_yaml:
00282 application = Application()
00283 if not 'app' in app:
00284 rospy.logerr('Invalid entry in applications.yaml file: %s'%str(app))
00285 elif application.configure(app['app']):
00286 available_applications[application.name] = application
00287 rospy.loginfo("Available application: %s"%application.name)
00288
00289
00290 for name, app in available_applications.iteritems():
00291 for d in app.depends:
00292 available_applications[d].depends_on.append(name)
00293
00294
00295 all_resources = {}
00296 for name, app in available_applications.iteritems():
00297 all_resources[name] = []
00298 self.update_resources(name, all_resources[name], available_applications)
00299 for name, app in available_applications.iteritems():
00300 for r in all_resources[name]:
00301 for name2 in available_applications:
00302 if r in all_resources[name2] and not name2 in app.conflicts:
00303 if name != name2 and not name2 in app.depends and not name2 in app.depends_on:
00304 app.conflicts.append(name2)
00305
00306 return available_applications
00307
00308
00309 def update_resources(self, app, resource_list, applications):
00310 for r in applications[app].resources:
00311 if not r in resource_list:
00312 resource_list.append(r)
00313 for d in applications[app].depends:
00314 self.update_resources(d, resource_list, applications)
00315
00316
00317
00318 def start_application(self, application_name, wait=False):
00319 print 'STARTING %s'%application_name
00320
00321 if not application_name in self.applications:
00322 rospy.logerr('Cannot start application %s, no application exists with that name'%application_name)
00323 return False
00324 app = self.applications[application_name]
00325
00326
00327 for a in app.conflicts:
00328 if self.applications[a].state != 'Stopped':
00329 self.stop_application(self.applications[a].name, wait=True)
00330
00331
00332 for a in app.depends:
00333 if self.applications[a].state != 'Running':
00334 self.start_application(self.applications[a].name, wait=True)
00335
00336
00337 self.queue.start_application(app, wait)
00338 return True
00339
00340
00341 def stop_application(self, application_name, wait=False):
00342
00343 if not application_name in self.applications:
00344 rospy.logerr('Cannot stop application %s, no application exists with that name'%application_name)
00345 return False
00346
00347
00348 app = self.applications[application_name]
00349 for a in app.depends_on:
00350 if self.applications[a].state != 'Stopped':
00351 self.stop_application(self.applications[a].name, wait=True)
00352
00353
00354 self.queue.stop_application(app, wait)
00355 return True
00356
00357
00358 def run_cb(self, goal):
00359 with self.lock:
00360 application_name = goal.get_goal().application_name
00361 if self.start_application(application_name):
00362 self.applications[application_name].client_goals.append(goal)
00363 goal.set_accepted()
00364 rospy.loginfo('Client connected to application %s'%application_name)
00365 else:
00366 goal.set_rejected()
00367
00368
00369 def cancel_cb(self, goal):
00370 with self.lock:
00371
00372 application = None
00373 for name, app in self.applications.iteritems():
00374 if goal in app.client_goals:
00375 application = app
00376 if not application:
00377 rospy.loginfo('Canceling goal that does not match up with an application any more')
00378 return
00379
00380
00381 application.client_goals.remove(goal)
00382 goal.set_canceled()
00383
00384
00385 if application.daemonize or len(application.client_goals) > 0:
00386 rospy.loginfo('Disconnected client from application %s, but keep applcation running'%application.name)
00387 else:
00388 self.stop_application(application.name)
00389
00390
00391 def stop_cb(self, goal):
00392 with self.lock:
00393 if self.stop_application(goal.application_name):
00394 self.stop_interface.set_succeeded(KillApplicationResult())
00395 else:
00396 self.stop_interface.set_aborted(KillApplicationResult())
00397
00398
00399 def kill_cb(self, goal):
00400 with self.lock:
00401 if self.stop_application(goal.application_name):
00402 self.kill_interface.set_succeeded(KillApplicationResult())
00403 else:
00404 self.kill_interface.set_aborted(KillApplicationResult())
00405
00406
00407 def list_cb(self, goal):
00408 with self.lock:
00409 res = ListApplicationsResult()
00410 for name, app in self.applications.iteritems():
00411 res.application_name.append(name)
00412 res.application_status.append(app.state)
00413 self.list_interface.set_succeeded(res)
00414
00415
00416
00417
00418
00419 def main():
00420 rospy.init_node('application_manager_node')
00421 queue = ApplicationQueue()
00422 ApplicationManager(queue)
00423 rospy.loginfo("Application manager running")
00424
00425 while not rospy.is_shutdown():
00426 rospy.sleep(0.1)
00427 item = queue.pull_application()
00428 if item:
00429 if item.action == 'start':
00430 item.result = item.app.start(item.wait)
00431 if item.action == 'stop':
00432 item.result = item.app.stop(item.wait)
00433
00434
00435 if __name__ == '__main__':
00436 main()
00437