manager.py
Go to the documentation of this file.
00001 #! /usr/bin/python
00002 #***********************************************************
00003 # Software License Agreement (BSD License)
00004 #
00005 # Copyright (c) 2008, Willow Garage, Inc.
00006 # All rights reserved.
00007 #
00008 # Redistribution and use in source and binary forms, with or without
00009 # modification, are permitted provided that the following conditions
00010 # are met:
00011 #
00012 #  * Redistributions of source code must retain the above copyright
00013 #    notice, this list of conditions and the following disclaimer.
00014 #  * Redistributions in binary form must reproduce the above
00015 #    copyright notice, this list of conditions and the following
00016 #    disclaimer in the documentation and/or other materials provided
00017 #    with the distribution.
00018 #  * Neither the name of the Willow Garage nor the names of its
00019 #    contributors may be used to endorse or promote products derived
00020 #    from this software without specific prior written permission.
00021 #
00022 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033 # POSSIBILITY OF SUCH DAMAGE.
00034 #
00035 
00036 # Author: Wim Meeussen
00037 
00038 from __future__ import with_statement
00039 
00040 import roslib; roslib.load_manifest('application_manager')
00041 import rospy
00042 import os
00043 import threading
00044 import actionlib
00045 from actionlib_msgs.msg import GoalStatus
00046 import yaml
00047 from application_manager import roslauncher
00048 from application_msgs.msg import *
00049 
00050 
00051 
00052 
00053 class Application():
00054     def __init__(self):
00055         self.state = 'Stopped' # state of the application (Running, Stopped, Preempted)
00056         self.client_goals = [] # list of clients that are connected to this application
00057         
00058 
00059     def configure(self, conf):
00060         self.lock = threading.Lock()  # lock for interacting with this application
00061 
00062         # check required keys
00063         required_keys = ['package', 'name', 'description', 'launch_file', 'resources', 'dependencies']
00064         for k in required_keys:
00065             if not k in conf:
00066                 rospy.logerr('Application in package %s does not specify option "%s"'%(conf['package'], k))
00067                 return False
00068 
00069         self.resources = conf['resources']
00070         self.depends = conf['dependencies']
00071         self.depends_on = []
00072         self.conflicts = []
00073         self.name = '/'.join([conf['package'], conf['name']])
00074         self.description = conf['description']
00075         try:
00076             if conf['launch_file'][0] == '/':
00077                 with open(os.path.join(conf['launch_file'])) as f:
00078                     self.launch_file = f.read()
00079             else:
00080                 with open(os.path.join(roslib.packages.get_pkg_dir(conf['package']), conf['launch_file'])) as f:
00081                     self.launch_file = f.read()
00082         except:
00083             rospy.logerr('Application %s specified invalid package or launch file'%conf['name'])
00084             return False
00085 
00086         # optional action interface
00087         self.action_ns = None
00088         if 'action_ns' in conf and 'preempt_timeout' in conf:
00089             self.action_ns = conf['action_ns']
00090             self.preempt_timeout = rospy.Duration(conf['preempt_timeout'])
00091             self.launch_timeout = rospy.Duration(30.0)
00092             self.action_client = None
00093 
00094         # optional daemon parameter
00095         self.daemonize = False
00096         if 'daemonize' in conf:
00097             self.daemonize = conf['daemonize']
00098 
00099         return True
00100 
00101 
00102 
00103     def start(self, wait):
00104         self.lock.acquire() # this lock gets released from other thread
00105 
00106         # check if application is already running
00107         if self.state == 'Running':
00108             rospy.loginfo('Applicaiton %s already running, not starting it again'%self.name)
00109             self.lock.release()
00110             return True
00111 
00112         # start launch file (from this thread, which is main thread)
00113         self.state = 'Starting'
00114         self.launcher = roslauncher.ScriptRoslaunch(self.launch_file)
00115         self.launcher.start()
00116 
00117         # start application in separate thread, so we don't block the manager
00118         self.application_starter = ApplicationStarter(self)
00119         self.application_starter.start() # this thread releases the lock
00120 
00121         # wait for start to finish if 'wait' is set to true
00122         while not rospy.is_shutdown() and wait and self.state != 'Running':
00123             rospy.sleep(0.1)
00124         return True
00125 
00126 
00127 
00128     def stop(self, wait):
00129         self.lock.acquire() # this lock gets released from other thread
00130 
00131         # check if application is running
00132         if self.state != 'Running':
00133             rospy.loginfo("Applicaiton %s is not running, so don't need to stop it"%self.name)
00134             self.lock.release()
00135             return True
00136 
00137         # notify all clients that are still connected
00138         for g in self.client_goals:
00139             g.set_canceled()
00140         self.client_goals = []
00141 
00142         # shut down application in separate thread, so we don't block the manager
00143         self.state = 'Preempting'
00144         self.application_stopper = ApplicationStopper(self)
00145         self.application_stopper.start()  # this thread releases the lock
00146 
00147         # wait for stop to finish if 'wait' is set to true
00148         while not rospy.is_shutdown() and wait and self.state != 'Stopped':
00149             rospy.sleep(0.1)
00150         return True
00151 
00152 
00153     def done_cb(self, status, result):
00154         with self.lock:
00155             if self.state == 'Running':
00156                 rospy.loginfo("Application %s stopped itself. Taking down launchfile"%self.name)
00157         self.stop(wait=False)
00158 
00159 
00160 class ApplicationStarter(threading.Thread):
00161     def __init__(self, application):
00162         threading.Thread.__init__(self)
00163         self.application = application
00164 
00165 
00166     def run(self):
00167         rospy.loginfo('Application starter thread created')
00168 
00169         # start action client if application supports this
00170         if self.application.action_ns:
00171 
00172             rospy.loginfo('Waiting for action client to come up in namespace %s for application %s'%(self.application.action_ns, self.application.name))
00173             self.application.action_client = actionlib.SimpleActionClient(self.application.action_ns, ApplicationAction)
00174             if not self.application.action_client.wait_for_server(self.application.launch_timeout):
00175                 rospy.logerr('Action client server for application %s did not connect in time.'%self.application.name)                    
00176                 self.application.launcher.shutdown()
00177                 self.application.launcher = None
00178                 self.application.state = 'Stopped'
00179                 self.application.lock.release() # this lock got acquired in other thread
00180                 return False
00181             self.application.action_client.send_goal(ApplicationGoal(), done_cb=self.application.done_cb)
00182             start_time = rospy.Time.now()
00183             while self.application.action_client.get_state() != GoalStatus.ACTIVE:
00184                 rospy.sleep(0.1)
00185                 if rospy.Time.now() > start_time + self.application.launch_timeout:
00186                     rospy.logerr('Action client server for application %s did not accept start goal in time.'%self.application.name)
00187                     self.application.launcher.shutdown()
00188                     self.application.launcher = None
00189                     self.application.state = 'Stopped'
00190                     self.application.lock.release() # this lock got acquired in other thread
00191                     return False
00192 
00193         self.application.state = 'Running'
00194         rospy.loginfo('Started application %s'%self.application.name)
00195         self.application.lock.release() # this lock got acquired in other thread
00196         return True
00197 
00198 
00199 
00200 
00201 class ApplicationStopper(threading.Thread):
00202     def __init__(self, application):
00203         threading.Thread.__init__(self)
00204         self.application = application
00205 
00206     def run(self):
00207         rospy.loginfo('Application stopper thread created')
00208         # preempt application if this is supported
00209         if self.application.action_ns:
00210             rospy.loginfo("Sending preempt signal to application %s"%self.application.name)
00211             self.application.action_client.cancel_goal()
00212             self.application.action_client.wait_for_result(self.application.preempt_timeout)
00213             self.application.action_client = None
00214 
00215         # kill launch file
00216         self.application.launcher.shutdown()
00217         self.application.launcher = None
00218         self.application.state = 'Stopped'
00219         rospy.loginfo('Stopped application %s'%self.application.name)
00220         self.application.lock.release() # this lock got acquired in other thread
00221 
00222 
00223 class ApplicationQueueItem():
00224     def __init__(self, app, action, wait):
00225         self.app = app
00226         self.action = action
00227         self.wait = wait
00228         self.result = None
00229 
00230 
00231 class ApplicationQueue():
00232     def __init__(self):
00233         self.items = []
00234         self.items_lock = threading.Lock()
00235 
00236     def pull_application(self):
00237         with self.items_lock:
00238             if len(self.items) == 0:
00239                 return None
00240             else:
00241                 item = self.items[0]
00242                 del self.items[0]
00243                 return item
00244 
00245     def push_application(self, app, action, wait):
00246         with self.items_lock:
00247             self.items.append(ApplicationQueueItem(app, action, wait))
00248         return True
00249 
00250     def stop_application(self, app, wait):
00251         return self.push_application(app, 'stop', wait)
00252 
00253     def start_application(self, app, wait):
00254         return self.push_application(app, 'start', wait)
00255 
00256 
00257 
00258 
00259 
00260 
00261 
00262 class ApplicationManager():
00263     def __init__(self, queue):
00264         self.lock = threading.RLock()
00265         self.queue = queue
00266         self.applications = self.get_available_applications()
00267         
00268         # interface of application manager
00269         self.run_interface = actionlib.ActionServer('application_manager/run_application', RunApplicationAction, self.run_cb, self.cancel_cb)
00270         self.kill_interface = actionlib.SimpleActionServer('application_manager/kill_application', KillApplicationAction, self.kill_cb)
00271         self.stop_interface = actionlib.SimpleActionServer('application_manager/stop_application', StopApplicationAction, self.stop_cb)
00272         self.list_interface = actionlib.SimpleActionServer('application_manager/list_applications', ListApplicationsAction, self.list_cb)
00273 
00274 
00275     def get_available_applications(self):
00276         with self.lock:
00277             # create applications
00278             path = roslib.packages.get_pkg_dir('application_manager')
00279             apps_yaml = yaml.load(open(path+'/applications.yaml'))
00280             available_applications = {}
00281             for app in apps_yaml:
00282                 application = Application()
00283                 if not 'app' in app:
00284                     rospy.logerr('Invalid entry in applications.yaml file: %s'%str(app))
00285                 elif application.configure(app['app']):
00286                     available_applications[application.name] = application
00287                     rospy.loginfo("Available application: %s"%application.name)                    
00288 
00289             # create depends-on list
00290             for name, app in available_applications.iteritems():
00291                 for d in app.depends:
00292                     available_applications[d].depends_on.append(name)
00293                     
00294             # create conflicts list
00295             all_resources = {}
00296             for name, app in available_applications.iteritems():
00297                 all_resources[name] = []
00298                 self.update_resources(name, all_resources[name], available_applications)
00299             for name, app in available_applications.iteritems():
00300                 for r in all_resources[name]:
00301                     for name2 in available_applications:
00302                         if r in all_resources[name2] and not name2 in app.conflicts:
00303                             if name != name2 and not name2 in app.depends and not name2 in app.depends_on:
00304                                 app.conflicts.append(name2)
00305 
00306             return available_applications
00307 
00308 
00309     def update_resources(self, app, resource_list, applications):
00310         for r in applications[app].resources:
00311             if not r in resource_list:
00312                 resource_list.append(r)
00313         for d in applications[app].depends:
00314             self.update_resources(d, resource_list, applications)
00315 
00316 
00317 
00318     def start_application(self, application_name, wait=False):
00319         print 'STARTING %s'%application_name
00320         # check if application exists
00321         if not application_name in self.applications:
00322             rospy.logerr('Cannot start application %s, no application exists with that name'%application_name)
00323             return False
00324         app = self.applications[application_name]
00325 
00326         # stop apps that conflict with this app
00327         for a in app.conflicts:
00328             if self.applications[a].state != 'Stopped':
00329                 self.stop_application(self.applications[a].name, wait=True)
00330 
00331         # start apps that this app depends on
00332         for a in app.depends:
00333             if self.applications[a].state != 'Running':
00334                 self.start_application(self.applications[a].name, wait=True)
00335 
00336         # start application
00337         self.queue.start_application(app, wait)
00338         return True
00339 
00340 
00341     def stop_application(self, application_name, wait=False):
00342         # check if application exists
00343         if not application_name in self.applications:
00344             rospy.logerr('Cannot stop application %s, no application exists with that name'%application_name)
00345             return False
00346 
00347         # stop apps that depend on this app
00348         app = self.applications[application_name]
00349         for a in app.depends_on:
00350             if self.applications[a].state != 'Stopped':
00351                 self.stop_application(self.applications[a].name, wait=True)
00352 
00353         # stop application
00354         self.queue.stop_application(app, wait)
00355         return True
00356 
00357 
00358     def run_cb(self, goal):
00359         with self.lock:
00360             application_name = goal.get_goal().application_name
00361             if self.start_application(application_name):
00362                 self.applications[application_name].client_goals.append(goal)
00363                 goal.set_accepted()
00364                 rospy.loginfo('Client connected to application %s'%application_name)
00365             else:
00366                 goal.set_rejected()
00367             
00368 
00369     def cancel_cb(self, goal):
00370         with self.lock:
00371             # find matching application
00372             application = None
00373             for name, app in self.applications.iteritems():
00374                 if goal in app.client_goals:
00375                     application = app
00376             if not application:
00377                 rospy.loginfo('Canceling goal that does not match up with an application any more')
00378                 return
00379             
00380             # remove goal from client goal list 
00381             application.client_goals.remove(goal)
00382             goal.set_canceled()
00383 
00384             # stop app if needed
00385             if application.daemonize or len(application.client_goals) > 0:
00386                 rospy.loginfo('Disconnected client from application %s, but keep applcation running'%application.name)
00387             else:
00388                 self.stop_application(application.name)
00389 
00390                 
00391     def stop_cb(self, goal):
00392         with self.lock:
00393             if self.stop_application(goal.application_name):
00394                 self.stop_interface.set_succeeded(KillApplicationResult())
00395             else:
00396                 self.stop_interface.set_aborted(KillApplicationResult())
00397 
00398 
00399     def kill_cb(self, goal):
00400         with self.lock:
00401             if self.stop_application(goal.application_name):
00402                 self.kill_interface.set_succeeded(KillApplicationResult())
00403             else:
00404                 self.kill_interface.set_aborted(KillApplicationResult())
00405 
00406 
00407     def list_cb(self, goal):
00408         with self.lock:
00409             res = ListApplicationsResult()
00410             for name, app in self.applications.iteritems():
00411                 res.application_name.append(name)
00412                 res.application_status.append(app.state)
00413             self.list_interface.set_succeeded(res)
00414 
00415 
00416         
00417 
00418 
00419 def main():
00420     rospy.init_node('application_manager_node')
00421     queue = ApplicationQueue()
00422     ApplicationManager(queue)
00423     rospy.loginfo("Application manager running")
00424 
00425     while not rospy.is_shutdown(): 
00426        rospy.sleep(0.1)
00427        item = queue.pull_application()
00428        if item:
00429            if item.action == 'start':
00430                item.result = item.app.start(item.wait)
00431            if item.action == 'stop':
00432                item.result = item.app.stop(item.wait)
00433 
00434 
00435 if __name__ == '__main__':
00436     main()
00437 


application_manager
Author(s): Wim Meeussen
autogenerated on Mon Dec 2 2013 12:00:57