$search
00001 #! /usr/bin/python 00002 #*********************************************************** 00003 # Software License Agreement (BSD License) 00004 # 00005 # Copyright (c) 2008, Willow Garage, Inc. 00006 # All rights reserved. 00007 # 00008 # Redistribution and use in source and binary forms, with or without 00009 # modification, are permitted provided that the following conditions 00010 # are met: 00011 # 00012 # * Redistributions of source code must retain the above copyright 00013 # notice, this list of conditions and the following disclaimer. 00014 # * Redistributions in binary form must reproduce the above 00015 # copyright notice, this list of conditions and the following 00016 # disclaimer in the documentation and/or other materials provided 00017 # with the distribution. 00018 # * Neither the name of the Willow Garage nor the names of its 00019 # contributors may be used to endorse or promote products derived 00020 # from this software without specific prior written permission. 00021 # 00022 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00023 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00024 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00025 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00026 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00027 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00028 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00029 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00030 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00031 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00032 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00033 # POSSIBILITY OF SUCH DAMAGE. 00034 # 00035 00036 # Author: Wim Meeussen 00037 00038 from __future__ import with_statement 00039 00040 import roslib; roslib.load_manifest('application_manager') 00041 import rospy 00042 import os 00043 import threading 00044 import actionlib 00045 from actionlib_msgs.msg import GoalStatus 00046 import yaml 00047 from application_manager import roslauncher 00048 from application_msgs.msg import * 00049 00050 00051 00052 00053 class Application(): 00054 def __init__(self): 00055 self.state = 'Stopped' # state of the application (Running, Stopped, Preempted) 00056 self.client_goals = [] # list of clients that are connected to this application 00057 00058 00059 def configure(self, conf): 00060 self.lock = threading.Lock() # lock for interacting with this application 00061 00062 # check required keys 00063 required_keys = ['package', 'name', 'description', 'launch_file', 'resources', 'dependencies'] 00064 for k in required_keys: 00065 if not k in conf: 00066 rospy.logerr('Application in package %s does not specify option "%s"'%(conf['package'], k)) 00067 return False 00068 00069 self.resources = conf['resources'] 00070 self.depends = conf['dependencies'] 00071 self.depends_on = [] 00072 self.conflicts = [] 00073 self.name = '/'.join([conf['package'], conf['name']]) 00074 self.description = conf['description'] 00075 try: 00076 if conf['launch_file'][0] == '/': 00077 with open(os.path.join(conf['launch_file'])) as f: 00078 self.launch_file = f.read() 00079 else: 00080 with open(os.path.join(roslib.packages.get_pkg_dir(conf['package']), conf['launch_file'])) as f: 00081 self.launch_file = f.read() 00082 except: 00083 rospy.logerr('Application %s specified invalid package or launch file'%conf['name']) 00084 return False 00085 00086 # optional action interface 00087 self.action_ns = None 00088 if 'action_ns' in conf and 'preempt_timeout' in conf: 00089 self.action_ns = conf['action_ns'] 00090 self.preempt_timeout = rospy.Duration(conf['preempt_timeout']) 00091 self.launch_timeout = rospy.Duration(30.0) 00092 self.action_client = None 00093 00094 # optional daemon parameter 00095 self.daemonize = False 00096 if 'daemonize' in conf: 00097 self.daemonize = conf['daemonize'] 00098 00099 return True 00100 00101 00102 00103 def start(self, wait): 00104 self.lock.acquire() # this lock gets released from other thread 00105 00106 # check if application is already running 00107 if self.state == 'Running': 00108 rospy.loginfo('Applicaiton %s already running, not starting it again'%self.name) 00109 self.lock.release() 00110 return True 00111 00112 # start launch file (from this thread, which is main thread) 00113 self.state = 'Starting' 00114 self.launcher = roslauncher.ScriptRoslaunch(self.launch_file) 00115 self.launcher.start() 00116 00117 # start application in separate thread, so we don't block the manager 00118 self.application_starter = ApplicationStarter(self) 00119 self.application_starter.start() # this thread releases the lock 00120 00121 # wait for start to finish if 'wait' is set to true 00122 while not rospy.is_shutdown() and wait and self.state != 'Running': 00123 rospy.sleep(0.1) 00124 return True 00125 00126 00127 00128 def stop(self, wait): 00129 self.lock.acquire() # this lock gets released from other thread 00130 00131 # check if application is running 00132 if self.state != 'Running': 00133 rospy.loginfo("Applicaiton %s is not running, so don't need to stop it"%self.name) 00134 self.lock.release() 00135 return True 00136 00137 # notify all clients that are still connected 00138 for g in self.client_goals: 00139 g.set_canceled() 00140 self.client_goals = [] 00141 00142 # shut down application in separate thread, so we don't block the manager 00143 self.state = 'Preempting' 00144 self.application_stopper = ApplicationStopper(self) 00145 self.application_stopper.start() # this thread releases the lock 00146 00147 # wait for stop to finish if 'wait' is set to true 00148 while not rospy.is_shutdown() and wait and self.state != 'Stopped': 00149 rospy.sleep(0.1) 00150 return True 00151 00152 00153 def done_cb(self, status, result): 00154 with self.lock: 00155 if self.state == 'Running': 00156 rospy.loginfo("Application %s stopped itself. Taking down launchfile"%self.name) 00157 self.stop(wait=False) 00158 00159 00160 class ApplicationStarter(threading.Thread): 00161 def __init__(self, application): 00162 threading.Thread.__init__(self) 00163 self.application = application 00164 00165 00166 def run(self): 00167 rospy.loginfo('Application starter thread created') 00168 00169 # start action client if application supports this 00170 if self.application.action_ns: 00171 00172 rospy.loginfo('Waiting for action client to come up in namespace %s for application %s'%(self.application.action_ns, self.application.name)) 00173 self.application.action_client = actionlib.SimpleActionClient(self.application.action_ns, ApplicationAction) 00174 if not self.application.action_client.wait_for_server(self.application.launch_timeout): 00175 rospy.logerr('Action client server for application %s did not connect in time.'%self.application.name) 00176 self.application.launcher.shutdown() 00177 self.application.launcher = None 00178 self.application.state = 'Stopped' 00179 self.application.lock.release() # this lock got acquired in other thread 00180 return False 00181 self.application.action_client.send_goal(ApplicationGoal(), done_cb=self.application.done_cb) 00182 start_time = rospy.Time.now() 00183 while self.application.action_client.get_state() != GoalStatus.ACTIVE: 00184 rospy.sleep(0.1) 00185 if rospy.Time.now() > start_time + self.application.launch_timeout: 00186 rospy.logerr('Action client server for application %s did not accept start goal in time.'%self.application.name) 00187 self.application.launcher.shutdown() 00188 self.application.launcher = None 00189 self.application.state = 'Stopped' 00190 self.application.lock.release() # this lock got acquired in other thread 00191 return False 00192 00193 self.application.state = 'Running' 00194 rospy.loginfo('Started application %s'%self.application.name) 00195 self.application.lock.release() # this lock got acquired in other thread 00196 return True 00197 00198 00199 00200 00201 class ApplicationStopper(threading.Thread): 00202 def __init__(self, application): 00203 threading.Thread.__init__(self) 00204 self.application = application 00205 00206 def run(self): 00207 rospy.loginfo('Application stopper thread created') 00208 # preempt application if this is supported 00209 if self.application.action_ns: 00210 rospy.loginfo("Sending preempt signal to application %s"%self.application.name) 00211 self.application.action_client.cancel_goal() 00212 self.application.action_client.wait_for_result(self.application.preempt_timeout) 00213 self.application.action_client = None 00214 00215 # kill launch file 00216 self.application.launcher.shutdown() 00217 self.application.launcher = None 00218 self.application.state = 'Stopped' 00219 rospy.loginfo('Stopped application %s'%self.application.name) 00220 self.application.lock.release() # this lock got acquired in other thread 00221 00222 00223 class ApplicationQueueItem(): 00224 def __init__(self, app, action, wait): 00225 self.app = app 00226 self.action = action 00227 self.wait = wait 00228 self.result = None 00229 00230 00231 class ApplicationQueue(): 00232 def __init__(self): 00233 self.items = [] 00234 self.items_lock = threading.Lock() 00235 00236 def pull_application(self): 00237 with self.items_lock: 00238 if len(self.items) == 0: 00239 return None 00240 else: 00241 item = self.items[0] 00242 del self.items[0] 00243 return item 00244 00245 def push_application(self, app, action, wait): 00246 with self.items_lock: 00247 self.items.append(ApplicationQueueItem(app, action, wait)) 00248 return True 00249 00250 def stop_application(self, app, wait): 00251 return self.push_application(app, 'stop', wait) 00252 00253 def start_application(self, app, wait): 00254 return self.push_application(app, 'start', wait) 00255 00256 00257 00258 00259 00260 00261 00262 class ApplicationManager(): 00263 def __init__(self, queue): 00264 self.lock = threading.RLock() 00265 self.queue = queue 00266 self.applications = self.get_available_applications() 00267 00268 # interface of application manager 00269 self.run_interface = actionlib.ActionServer('application_manager/run_application', RunApplicationAction, self.run_cb, self.cancel_cb) 00270 self.kill_interface = actionlib.SimpleActionServer('application_manager/kill_application', KillApplicationAction, self.kill_cb) 00271 self.stop_interface = actionlib.SimpleActionServer('application_manager/stop_application', StopApplicationAction, self.stop_cb) 00272 self.list_interface = actionlib.SimpleActionServer('application_manager/list_applications', ListApplicationsAction, self.list_cb) 00273 00274 00275 def get_available_applications(self): 00276 with self.lock: 00277 # create applications 00278 path = roslib.packages.get_pkg_dir('application_manager') 00279 apps_yaml = yaml.load(open(path+'/applications.yaml')) 00280 available_applications = {} 00281 for app in apps_yaml: 00282 application = Application() 00283 if not 'app' in app: 00284 rospy.logerr('Invalid entry in applications.yaml file: %s'%str(app)) 00285 elif application.configure(app['app']): 00286 available_applications[application.name] = application 00287 rospy.loginfo("Available application: %s"%application.name) 00288 00289 # create depends-on list 00290 for name, app in available_applications.iteritems(): 00291 for d in app.depends: 00292 available_applications[d].depends_on.append(name) 00293 00294 # create conflicts list 00295 all_resources = {} 00296 for name, app in available_applications.iteritems(): 00297 all_resources[name] = [] 00298 self.update_resources(name, all_resources[name], available_applications) 00299 for name, app in available_applications.iteritems(): 00300 for r in all_resources[name]: 00301 for name2 in available_applications: 00302 if r in all_resources[name2] and not name2 in app.conflicts: 00303 if name != name2 and not name2 in app.depends and not name2 in app.depends_on: 00304 app.conflicts.append(name2) 00305 00306 return available_applications 00307 00308 00309 def update_resources(self, app, resource_list, applications): 00310 for r in applications[app].resources: 00311 if not r in resource_list: 00312 resource_list.append(r) 00313 for d in applications[app].depends: 00314 self.update_resources(d, resource_list, applications) 00315 00316 00317 00318 def start_application(self, application_name, wait=False): 00319 print 'STARTING %s'%application_name 00320 # check if application exists 00321 if not application_name in self.applications: 00322 rospy.logerr('Cannot start application %s, no application exists with that name'%application_name) 00323 return False 00324 app = self.applications[application_name] 00325 00326 # stop apps that conflict with this app 00327 for a in app.conflicts: 00328 if self.applications[a].state != 'Stopped': 00329 self.stop_application(self.applications[a].name, wait=True) 00330 00331 # start apps that this app depends on 00332 for a in app.depends: 00333 if self.applications[a].state != 'Running': 00334 self.start_application(self.applications[a].name, wait=True) 00335 00336 # start application 00337 self.queue.start_application(app, wait) 00338 return True 00339 00340 00341 def stop_application(self, application_name, wait=False): 00342 # check if application exists 00343 if not application_name in self.applications: 00344 rospy.logerr('Cannot stop application %s, no application exists with that name'%application_name) 00345 return False 00346 00347 # stop apps that depend on this app 00348 app = self.applications[application_name] 00349 for a in app.depends_on: 00350 if self.applications[a].state != 'Stopped': 00351 self.stop_application(self.applications[a].name, wait=True) 00352 00353 # stop application 00354 self.queue.stop_application(app, wait) 00355 return True 00356 00357 00358 def run_cb(self, goal): 00359 with self.lock: 00360 application_name = goal.get_goal().application_name 00361 if self.start_application(application_name): 00362 self.applications[application_name].client_goals.append(goal) 00363 goal.set_accepted() 00364 rospy.loginfo('Client connected to application %s'%application_name) 00365 else: 00366 goal.set_rejected() 00367 00368 00369 def cancel_cb(self, goal): 00370 with self.lock: 00371 # find matching application 00372 application = None 00373 for name, app in self.applications.iteritems(): 00374 if goal in app.client_goals: 00375 application = app 00376 if not application: 00377 rospy.loginfo('Canceling goal that does not match up with an application any more') 00378 return 00379 00380 # remove goal from client goal list 00381 application.client_goals.remove(goal) 00382 goal.set_canceled() 00383 00384 # stop app if needed 00385 if application.daemonize or len(application.client_goals) > 0: 00386 rospy.loginfo('Disconnected client from application %s, but keep applcation running'%application.name) 00387 else: 00388 self.stop_application(application.name) 00389 00390 00391 def stop_cb(self, goal): 00392 with self.lock: 00393 if self.stop_application(goal.application_name): 00394 self.stop_interface.set_succeeded(KillApplicationResult()) 00395 else: 00396 self.stop_interface.set_aborted(KillApplicationResult()) 00397 00398 00399 def kill_cb(self, goal): 00400 with self.lock: 00401 if self.stop_application(goal.application_name): 00402 self.kill_interface.set_succeeded(KillApplicationResult()) 00403 else: 00404 self.kill_interface.set_aborted(KillApplicationResult()) 00405 00406 00407 def list_cb(self, goal): 00408 with self.lock: 00409 res = ListApplicationsResult() 00410 for name, app in self.applications.iteritems(): 00411 res.application_name.append(name) 00412 res.application_status.append(app.state) 00413 self.list_interface.set_succeeded(res) 00414 00415 00416 00417 00418 00419 def main(): 00420 rospy.init_node('application_manager_node') 00421 queue = ApplicationQueue() 00422 ApplicationManager(queue) 00423 rospy.loginfo("Application manager running") 00424 00425 while not rospy.is_shutdown(): 00426 rospy.sleep(0.1) 00427 item = queue.pull_application() 00428 if item: 00429 if item.action == 'start': 00430 item.result = item.app.start(item.wait) 00431 if item.action == 'stop': 00432 item.result = item.app.stop(item.wait) 00433 00434 00435 if __name__ == '__main__': 00436 main() 00437