00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 from __future__ import with_statement
00039 
00040 import roslib; roslib.load_manifest('application_manager')
00041 import rospy
00042 import os
00043 import threading
00044 import actionlib
00045 from actionlib_msgs.msg import GoalStatus
00046 import yaml
00047 from application_manager import roslauncher
00048 from application_msgs.msg import *
00049 
00050 
00051 
00052 
00053 class Application():
00054     def __init__(self):
00055         self.state = 'Stopped' 
00056         self.client_goals = [] 
00057         
00058 
00059     def configure(self, conf):
00060         self.lock = threading.Lock()  
00061 
00062         
00063         required_keys = ['package', 'name', 'description', 'launch_file', 'resources', 'dependencies']
00064         for k in required_keys:
00065             if not k in conf:
00066                 rospy.logerr('Application in package %s does not specify option "%s"'%(conf['package'], k))
00067                 return False
00068 
00069         self.resources = conf['resources']
00070         self.depends = conf['dependencies']
00071         self.depends_on = []
00072         self.conflicts = []
00073         self.name = '/'.join([conf['package'], conf['name']])
00074         self.description = conf['description']
00075         try:
00076             if conf['launch_file'][0] == '/':
00077                 with open(os.path.join(conf['launch_file'])) as f:
00078                     self.launch_file = f.read()
00079             else:
00080                 with open(os.path.join(roslib.packages.get_pkg_dir(conf['package']), conf['launch_file'])) as f:
00081                     self.launch_file = f.read()
00082         except:
00083             rospy.logerr('Application %s specified invalid package or launch file'%conf['name'])
00084             return False
00085 
00086         
00087         self.action_ns = None
00088         if 'action_ns' in conf and 'preempt_timeout' in conf:
00089             self.action_ns = conf['action_ns']
00090             self.preempt_timeout = rospy.Duration(conf['preempt_timeout'])
00091             self.launch_timeout = rospy.Duration(30.0)
00092             self.action_client = None
00093 
00094         
00095         self.daemonize = False
00096         if 'daemonize' in conf:
00097             self.daemonize = conf['daemonize']
00098 
00099         return True
00100 
00101 
00102 
00103     def start(self, wait):
00104         self.lock.acquire() 
00105 
00106         
00107         if self.state == 'Running':
00108             rospy.loginfo('Applicaiton %s already running, not starting it again'%self.name)
00109             self.lock.release()
00110             return True
00111 
00112         
00113         self.state = 'Starting'
00114         self.launcher = roslauncher.ScriptRoslaunch(self.launch_file)
00115         self.launcher.start()
00116 
00117         
00118         self.application_starter = ApplicationStarter(self)
00119         self.application_starter.start() 
00120 
00121         
00122         while not rospy.is_shutdown() and wait and self.state != 'Running':
00123             rospy.sleep(0.1)
00124         return True
00125 
00126 
00127 
00128     def stop(self, wait):
00129         self.lock.acquire() 
00130 
00131         
00132         if self.state != 'Running':
00133             rospy.loginfo("Applicaiton %s is not running, so don't need to stop it"%self.name)
00134             self.lock.release()
00135             return True
00136 
00137         
00138         for g in self.client_goals:
00139             g.set_canceled()
00140         self.client_goals = []
00141 
00142         
00143         self.state = 'Preempting'
00144         self.application_stopper = ApplicationStopper(self)
00145         self.application_stopper.start()  
00146 
00147         
00148         while not rospy.is_shutdown() and wait and self.state != 'Stopped':
00149             rospy.sleep(0.1)
00150         return True
00151 
00152 
00153     def done_cb(self, status, result):
00154         with self.lock:
00155             if self.state == 'Running':
00156                 rospy.loginfo("Application %s stopped itself. Taking down launchfile"%self.name)
00157         self.stop(wait=False)
00158 
00159 
00160 class ApplicationStarter(threading.Thread):
00161     def __init__(self, application):
00162         threading.Thread.__init__(self)
00163         self.application = application
00164 
00165 
00166     def run(self):
00167         rospy.loginfo('Application starter thread created')
00168 
00169         
00170         if self.application.action_ns:
00171 
00172             rospy.loginfo('Waiting for action client to come up in namespace %s for application %s'%(self.application.action_ns, self.application.name))
00173             self.application.action_client = actionlib.SimpleActionClient(self.application.action_ns, ApplicationAction)
00174             if not self.application.action_client.wait_for_server(self.application.launch_timeout):
00175                 rospy.logerr('Action client server for application %s did not connect in time.'%self.application.name)                    
00176                 self.application.launcher.shutdown()
00177                 self.application.launcher = None
00178                 self.application.state = 'Stopped'
00179                 self.application.lock.release() 
00180                 return False
00181             self.application.action_client.send_goal(ApplicationGoal(), done_cb=self.application.done_cb)
00182             start_time = rospy.Time.now()
00183             while self.application.action_client.get_state() != GoalStatus.ACTIVE:
00184                 rospy.sleep(0.1)
00185                 if rospy.Time.now() > start_time + self.application.launch_timeout:
00186                     rospy.logerr('Action client server for application %s did not accept start goal in time.'%self.application.name)
00187                     self.application.launcher.shutdown()
00188                     self.application.launcher = None
00189                     self.application.state = 'Stopped'
00190                     self.application.lock.release() 
00191                     return False
00192 
00193         self.application.state = 'Running'
00194         rospy.loginfo('Started application %s'%self.application.name)
00195         self.application.lock.release() 
00196         return True
00197 
00198 
00199 
00200 
00201 class ApplicationStopper(threading.Thread):
00202     def __init__(self, application):
00203         threading.Thread.__init__(self)
00204         self.application = application
00205 
00206     def run(self):
00207         rospy.loginfo('Application stopper thread created')
00208         
00209         if self.application.action_ns:
00210             rospy.loginfo("Sending preempt signal to application %s"%self.application.name)
00211             self.application.action_client.cancel_goal()
00212             self.application.action_client.wait_for_result(self.application.preempt_timeout)
00213             self.application.action_client = None
00214 
00215         
00216         self.application.launcher.shutdown()
00217         self.application.launcher = None
00218         self.application.state = 'Stopped'
00219         rospy.loginfo('Stopped application %s'%self.application.name)
00220         self.application.lock.release() 
00221 
00222 
00223 class ApplicationQueueItem():
00224     def __init__(self, app, action, wait):
00225         self.app = app
00226         self.action = action
00227         self.wait = wait
00228         self.result = None
00229 
00230 
00231 class ApplicationQueue():
00232     def __init__(self):
00233         self.items = []
00234         self.items_lock = threading.Lock()
00235 
00236     def pull_application(self):
00237         with self.items_lock:
00238             if len(self.items) == 0:
00239                 return None
00240             else:
00241                 item = self.items[0]
00242                 del self.items[0]
00243                 return item
00244 
00245     def push_application(self, app, action, wait):
00246         with self.items_lock:
00247             self.items.append(ApplicationQueueItem(app, action, wait))
00248         return True
00249 
00250     def stop_application(self, app, wait):
00251         return self.push_application(app, 'stop', wait)
00252 
00253     def start_application(self, app, wait):
00254         return self.push_application(app, 'start', wait)
00255 
00256 
00257 
00258 
00259 
00260 
00261 
00262 class ApplicationManager():
00263     def __init__(self, queue):
00264         self.lock = threading.RLock()
00265         self.queue = queue
00266         self.applications = self.get_available_applications()
00267         
00268         
00269         self.run_interface = actionlib.ActionServer('application_manager/run_application', RunApplicationAction, self.run_cb, self.cancel_cb)
00270         self.kill_interface = actionlib.SimpleActionServer('application_manager/kill_application', KillApplicationAction, self.kill_cb)
00271         self.stop_interface = actionlib.SimpleActionServer('application_manager/stop_application', StopApplicationAction, self.stop_cb)
00272         self.list_interface = actionlib.SimpleActionServer('application_manager/list_applications', ListApplicationsAction, self.list_cb)
00273 
00274 
00275     def get_available_applications(self):
00276         with self.lock:
00277             
00278             path = roslib.packages.get_pkg_dir('application_manager')
00279             apps_yaml = yaml.load(open(path+'/applications.yaml'))
00280             available_applications = {}
00281             for app in apps_yaml:
00282                 application = Application()
00283                 if not 'app' in app:
00284                     rospy.logerr('Invalid entry in applications.yaml file: %s'%str(app))
00285                 elif application.configure(app['app']):
00286                     available_applications[application.name] = application
00287                     rospy.loginfo("Available application: %s"%application.name)                    
00288 
00289             
00290             for name, app in available_applications.iteritems():
00291                 for d in app.depends:
00292                     available_applications[d].depends_on.append(name)
00293                     
00294             
00295             all_resources = {}
00296             for name, app in available_applications.iteritems():
00297                 all_resources[name] = []
00298                 self.update_resources(name, all_resources[name], available_applications)
00299             for name, app in available_applications.iteritems():
00300                 for r in all_resources[name]:
00301                     for name2 in available_applications:
00302                         if r in all_resources[name2] and not name2 in app.conflicts:
00303                             if name != name2 and not name2 in app.depends and not name2 in app.depends_on:
00304                                 app.conflicts.append(name2)
00305 
00306             return available_applications
00307 
00308 
00309     def update_resources(self, app, resource_list, applications):
00310         for r in applications[app].resources:
00311             if not r in resource_list:
00312                 resource_list.append(r)
00313         for d in applications[app].depends:
00314             self.update_resources(d, resource_list, applications)
00315 
00316 
00317 
00318     def start_application(self, application_name, wait=False):
00319         print 'STARTING %s'%application_name
00320         
00321         if not application_name in self.applications:
00322             rospy.logerr('Cannot start application %s, no application exists with that name'%application_name)
00323             return False
00324         app = self.applications[application_name]
00325 
00326         
00327         for a in app.conflicts:
00328             if self.applications[a].state != 'Stopped':
00329                 self.stop_application(self.applications[a].name, wait=True)
00330 
00331         
00332         for a in app.depends:
00333             if self.applications[a].state != 'Running':
00334                 self.start_application(self.applications[a].name, wait=True)
00335 
00336         
00337         self.queue.start_application(app, wait)
00338         return True
00339 
00340 
00341     def stop_application(self, application_name, wait=False):
00342         
00343         if not application_name in self.applications:
00344             rospy.logerr('Cannot stop application %s, no application exists with that name'%application_name)
00345             return False
00346 
00347         
00348         app = self.applications[application_name]
00349         for a in app.depends_on:
00350             if self.applications[a].state != 'Stopped':
00351                 self.stop_application(self.applications[a].name, wait=True)
00352 
00353         
00354         self.queue.stop_application(app, wait)
00355         return True
00356 
00357 
00358     def run_cb(self, goal):
00359         with self.lock:
00360             application_name = goal.get_goal().application_name
00361             if self.start_application(application_name):
00362                 self.applications[application_name].client_goals.append(goal)
00363                 goal.set_accepted()
00364                 rospy.loginfo('Client connected to application %s'%application_name)
00365             else:
00366                 goal.set_rejected()
00367             
00368 
00369     def cancel_cb(self, goal):
00370         with self.lock:
00371             
00372             application = None
00373             for name, app in self.applications.iteritems():
00374                 if goal in app.client_goals:
00375                     application = app
00376             if not application:
00377                 rospy.loginfo('Canceling goal that does not match up with an application any more')
00378                 return
00379             
00380             
00381             application.client_goals.remove(goal)
00382             goal.set_canceled()
00383 
00384             
00385             if application.daemonize or len(application.client_goals) > 0:
00386                 rospy.loginfo('Disconnected client from application %s, but keep applcation running'%application.name)
00387             else:
00388                 self.stop_application(application.name)
00389 
00390                 
00391     def stop_cb(self, goal):
00392         with self.lock:
00393             if self.stop_application(goal.application_name):
00394                 self.stop_interface.set_succeeded(KillApplicationResult())
00395             else:
00396                 self.stop_interface.set_aborted(KillApplicationResult())
00397 
00398 
00399     def kill_cb(self, goal):
00400         with self.lock:
00401             if self.stop_application(goal.application_name):
00402                 self.kill_interface.set_succeeded(KillApplicationResult())
00403             else:
00404                 self.kill_interface.set_aborted(KillApplicationResult())
00405 
00406 
00407     def list_cb(self, goal):
00408         with self.lock:
00409             res = ListApplicationsResult()
00410             for name, app in self.applications.iteritems():
00411                 res.application_name.append(name)
00412                 res.application_status.append(app.state)
00413             self.list_interface.set_succeeded(res)
00414 
00415 
00416         
00417 
00418 
00419 def main():
00420     rospy.init_node('application_manager_node')
00421     queue = ApplicationQueue()
00422     ApplicationManager(queue)
00423     rospy.loginfo("Application manager running")
00424 
00425     while not rospy.is_shutdown(): 
00426        rospy.sleep(0.1)
00427        item = queue.pull_application()
00428        if item:
00429            if item.action == 'start':
00430                item.result = item.app.start(item.wait)
00431            if item.action == 'stop':
00432                item.result = item.app.stop(item.wait)
00433 
00434 
00435 if __name__ == '__main__':
00436     main()
00437