$search
00001 #!/usr/bin/python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of the Willow Garage nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 00035 # Author: Wim Meeussen 00036 00037 00038 00039 00040 from __future__ import with_statement 00041 00042 import roslib; roslib.load_manifest('continuous_ops_task_manager') 00043 import rospy 00044 import os 00045 import threading 00046 import yaml 00047 import copy 00048 import roslauncher 00049 from continuous_ops_msgs.srv import * 00050 from continuous_ops_msgs.msg import * 00051 00052 00053 def tasks_to_msg(tasks, ids=None): 00054 task_msgs = [] 00055 if ids: 00056 loop = ids 00057 else: 00058 loop = tasks.keys() 00059 for t in loop: 00060 task_msgs.append(tasks[t].to_msg()) 00061 return task_msgs 00062 00063 00064 00065 class OneTask(): 00066 def __init__(self, pkg, file, conf): 00067 path = roslib.packages.get_pkg_dir(pkg) 00068 self.name = conf['name'] 00069 self.description = conf['description'] 00070 self.task_id = ('_'.join([pkg, file])).replace('.','_') 00071 self.run_id = '' 00072 self.launch_file = os.path.join(path, conf['launch_file']) 00073 self.image_file = os.path.join(path, conf['image_file']) 00074 self.state = '' 00075 self.action_ns = conf['action_ns'] 00076 self.timeout = rospy.Duration(conf['timeout']) 00077 00078 def to_msg(self): 00079 msg = Task() 00080 msg.name = self.name 00081 msg.description = self.description 00082 msg.task_id = self.task_id 00083 msg.run_id = self.run_id 00084 msg.launch_file = self.launch_file 00085 msg.image_file = self.image_file 00086 msg.state = self.state 00087 msg.action_ns = self.action_ns 00088 msg.timeout = self.timeout 00089 return msg 00090 00091 00092 00093 00094 class TaskManager(): 00095 def __init__(self): 00096 self.lock = threading.RLock() 00097 00098 # Get list of available tasks 00099 self.pub_available = rospy.Publisher('available_tasks', Tasks, latch=True) 00100 self.pub_available_tasks() 00101 00102 # List of scheduled tasks, store in dictionary with key run_id 00103 self.start_task_needed = False 00104 self.schedule_order = [] 00105 self.scheduled_tasks = {} 00106 self.launch = None 00107 self.pub_scheduled = rospy.Publisher('scheduled_tasks', Tasks, latch=True) 00108 self.get_scheduled_tasks() 00109 00110 # Advertise services 00111 self.srv_get_available_tasks = rospy.Service('get_available_tasks', GetTaskList, self.get_available_tasks_srv) 00112 self.srv_get_scheduled_tasks = rospy.Service('get_scheduled_tasks', GetTaskList, self.get_scheduled_tasks_srv) 00113 self.srv_add_task = rospy.Service('add_task', AddTask, self.add_task_srv) 00114 self.srv_remove_task = rospy.Service('remove_task', ChangeTask, self.remove_task_srv) 00115 self.srv_start_task = rospy.Service('start_task', ChangeTask, self.start_task_srv) 00116 self.srv_stop_task = rospy.Service('stop_task', ChangeTask, self.stop_task_srv) 00117 self.srv_reorder_tasks = rospy.Service('reorder_tasks', ReorderTaskList, self.reorder_tasks_srv) 00118 00119 def get_available_tasks(self): 00120 with self.lock: 00121 self.available_tasks = {} 00122 for pkg in roslib.rospack.rospack_depends_on_1('continuous_ops_task_manager'): 00123 path = roslib.packages.get_pkg_dir(pkg) 00124 files = os.listdir(path) 00125 for f in files: 00126 if f.partition('.')[2] == 'task': 00127 conf = yaml.load(open(os.path.join(path, f))) 00128 task = OneTask(pkg, f, conf) 00129 self.available_tasks[task.task_id] = task 00130 return tasks_to_msg(self.available_tasks) 00131 00132 def pub_available_tasks(self): 00133 self.pub_available.publish(self.get_available_tasks()) 00134 00135 def get_scheduled_tasks(self): 00136 with self.lock: 00137 msg = tasks_to_msg(self.scheduled_tasks, self.schedule_order) 00138 return msg 00139 00140 def pub_scheduled_tasks(self): 00141 self.pub_scheduled.publish(self.get_scheduled_tasks()) 00142 00143 def start_task(self, run_id): 00144 with self.lock: 00145 if not run_id in self.scheduled_tasks: 00146 rospy.logerr('Cannot start task with run_id %s, no task exists with that run_id'%run_id) 00147 return False 00148 if self.launch: 00149 rospy.logerr('Cannot start task with run_id %s, another task is still running'%run_id) 00150 return False 00151 if self.schedule_order[0] != run_id: 00152 rospy.logerr('Cannot start task with run_id %s, this is not the first task in the schedule'%run_id) 00153 return False 00154 with open(self.scheduled_tasks[run_id].launch_file) as f: 00155 rospy.loginfo('Starting task with run_id %s'%run_id) 00156 self.launch = roslauncher.ScriptRoslaunch(f.read()) 00157 self.launch.start() 00158 self.scheduled_tasks[run_id].state = 'Running' 00159 self.pub_scheduled_tasks() 00160 return True 00161 return False 00162 00163 def stop_task(self, run_id): 00164 with self.lock: 00165 if not run_id in self.scheduled_tasks: 00166 rospy.logerr('Cannot stop task with run_id %s, no task exists with that run_id'%run_id) 00167 return False 00168 if not self.scheduled_tasks[run_id].state == 'Running': 00169 rospy.logerr('Cannot stop task with run_id %s, task is not running'%run_id) 00170 return False 00171 rospy.loginfo('Stopped task with run_id %s'%run_id) 00172 self.launch.shutdown() 00173 self.launch = None 00174 self.scheduled_tasks[run_id].state = 'Stopped' 00175 self.pub_scheduled_tasks() 00176 return True 00177 00178 def add_task(self, task_id): 00179 with self.lock: 00180 if not task_id in self.available_tasks: 00181 rospy.logerr('Cannot add task with task_id %s, because no task exists with that task_id'%task_id) 00182 return '' 00183 run_id = ('_'.join([task_id,str(rospy.Time.now())])).replace('.','_') 00184 rospy.loginfo('Adding new task with run_id %s'%run_id) 00185 task = copy.deepcopy(self.available_tasks[task_id]) 00186 task.state = 'Queued' 00187 task.run_id = run_id 00188 self.scheduled_tasks[task.run_id] = task 00189 self.schedule_order.append(task.run_id) 00190 self.pub_scheduled_tasks() 00191 return task.run_id 00192 00193 def remove_task(self, run_id): 00194 with self.lock: 00195 if not run_id in self.scheduled_tasks: 00196 rospy.logerr('Cannot remove task with run_id %s, no task exists with that run_id'%run_id) 00197 return False 00198 if self.scheduled_tasks[run_id].state == 'Running': 00199 rospy.logerr('Cannot remove task with run_id %s, this task is currently running'%run_id) 00200 return False 00201 rospy.loginfo('Removed task with run_id %s'%run_id) 00202 del self.scheduled_tasks[run_id] 00203 self.schedule_order.remove(run_id) 00204 self.pub_scheduled_tasks() 00205 return True 00206 00207 def reorder_tasks(self, run_id_list): 00208 if len(run_id_list) != len(self.schedule_order): 00209 rospy.logerr('Cannot reorder tasks. Desired list has different length than scheduled list.') 00210 return False 00211 for t in self.schedule_order: 00212 if not t in run_id_list: 00213 rospy.logerr('Cannot reorder tasks. Task %s is not specified in desired list'%t) 00214 return False 00215 if len(run_id_list) == 0: 00216 return True 00217 if self.scheduled_tasks[self.schedule_order[0]].state == 'Running' and self.schedule_order[0] != run_id_list[0]: 00218 rospy.logerr('Cannot reorder a running task with run_id %s'%self.schedule_order[0]) 00219 return False 00220 rospy.loginfo('Reordering scheduled tasks') 00221 self.schedule_order = run_id_list 00222 self.pub_scheduled_tasks() 00223 return True 00224 00225 00226 # Service callbacks 00227 def get_available_tasks_srv(self, req): 00228 msg = self.get_available_tasks() 00229 self.pub_available_tasks() 00230 return GetTaskListResponse(msg) 00231 00232 def get_scheduled_tasks_srv(self, req): 00233 return GetTaskListResponse(self.get_scheduled_tasks()) 00234 00235 def add_task_srv(self, req): 00236 return AddTaskResponse(self.add_task(req.task_id), self.get_scheduled_tasks()) 00237 00238 def remove_task_srv(self, req): 00239 self.remove_task(req.run_id) 00240 return ChangeTaskResponse(self.get_scheduled_tasks()) 00241 00242 def start_task_srv(self, req): 00243 self.start_task_run_id = req.run_id 00244 self.start_task_needed = True 00245 while self.start_task_needed: 00246 rospy.sleep(0.1) 00247 return ChangeTaskResponse(self.get_scheduled_tasks()) 00248 00249 def stop_task_srv(self, req): 00250 self.stop_task(req.run_id) 00251 return ChangeTaskResponse(self.get_scheduled_tasks()) 00252 00253 def reorder_tasks_srv(self, req): 00254 self.reorder_tasks(req.run_id_list) 00255 return ReorderTaskListResponse(self.get_scheduled_tasks()) 00256 00257 00258 00259 def main(): 00260 rospy.init_node('task_manager_node') 00261 tm = TaskManager() 00262 00263 # do some testing 00264 # tm.add_task('continuous_ops_test_task_wim_task') 00265 # tm.add_task('continuous_ops_test_task_wim_task') 00266 # tm.add_task('continuous_ops_test_task_eitan_task') 00267 00268 while not rospy.is_shutdown(): 00269 if tm.start_task_needed: 00270 print 'STARTING THREAD' 00271 tm.start_task(tm.start_task_run_id) 00272 tm.start_task_needed = False 00273 rospy.sleep(0.1) 00274 00275 if __name__ == '__main__': 00276 main()