00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 from __future__ import with_statement
00041 
00042 import roslib; roslib.load_manifest('continuous_ops_task_manager')
00043 import rospy
00044 import os
00045 import threading
00046 import yaml
00047 import copy
00048 import roslauncher
00049 from continuous_ops_msgs.srv import *
00050 from continuous_ops_msgs.msg import *
00051 
00052 
00053 def tasks_to_msg(tasks, ids=None):
00054     task_msgs = []
00055     if ids:
00056         loop = ids
00057     else:
00058         loop = tasks.keys()
00059     for t in loop:
00060         task_msgs.append(tasks[t].to_msg())
00061     return task_msgs
00062 
00063 
00064 
00065 class OneTask():
00066     def __init__(self, pkg, file, conf):
00067         path = roslib.packages.get_pkg_dir(pkg)
00068         self.name = conf['name']
00069         self.description = conf['description']
00070         self.task_id = ('_'.join([pkg, file])).replace('.','_')
00071         self.run_id = ''
00072         self.launch_file = os.path.join(path, conf['launch_file'])
00073         self.image_file = os.path.join(path, conf['image_file'])
00074         self.state = ''
00075         self.action_ns = conf['action_ns']
00076         self.timeout = rospy.Duration(conf['timeout'])
00077 
00078     def to_msg(self):
00079         msg = Task()
00080         msg.name = self.name
00081         msg.description = self.description
00082         msg.task_id = self.task_id
00083         msg.run_id = self.run_id
00084         msg.launch_file = self.launch_file
00085         msg.image_file = self.image_file
00086         msg.state = self.state
00087         msg.action_ns = self.action_ns
00088         msg.timeout = self.timeout
00089         return msg
00090 
00091 
00092 
00093 
00094 class TaskManager():
00095     def __init__(self):
00096         self.lock = threading.RLock()
00097 
00098         
00099         self.pub_available = rospy.Publisher('available_tasks', Tasks, latch=True)
00100         self.pub_available_tasks()
00101 
00102         
00103         self.start_task_needed = False
00104         self.schedule_order = []
00105         self.scheduled_tasks = {}
00106         self.launch = None
00107         self.pub_scheduled = rospy.Publisher('scheduled_tasks', Tasks, latch=True)
00108         self.get_scheduled_tasks()
00109 
00110         
00111         self.srv_get_available_tasks = rospy.Service('get_available_tasks', GetTaskList, self.get_available_tasks_srv)
00112         self.srv_get_scheduled_tasks = rospy.Service('get_scheduled_tasks', GetTaskList, self.get_scheduled_tasks_srv)
00113         self.srv_add_task = rospy.Service('add_task', AddTask, self.add_task_srv)
00114         self.srv_remove_task = rospy.Service('remove_task', ChangeTask, self.remove_task_srv)
00115         self.srv_start_task = rospy.Service('start_task', ChangeTask, self.start_task_srv)
00116         self.srv_stop_task = rospy.Service('stop_task', ChangeTask, self.stop_task_srv)
00117         self.srv_reorder_tasks = rospy.Service('reorder_tasks', ReorderTaskList, self.reorder_tasks_srv)
00118 
00119     def get_available_tasks(self):
00120         with self.lock:
00121             self.available_tasks = {}
00122             for pkg in roslib.rospack.rospack_depends_on_1('continuous_ops_task_manager'):
00123                 path = roslib.packages.get_pkg_dir(pkg)
00124                 files = os.listdir(path)
00125                 for f in files:
00126                     if f.partition('.')[2] == 'task':
00127                         conf = yaml.load(open(os.path.join(path, f)))
00128                         task = OneTask(pkg, f, conf)
00129                         self.available_tasks[task.task_id] = task
00130             return tasks_to_msg(self.available_tasks)
00131 
00132     def pub_available_tasks(self):
00133         self.pub_available.publish(self.get_available_tasks())
00134 
00135     def get_scheduled_tasks(self):
00136         with self.lock:
00137             msg = tasks_to_msg(self.scheduled_tasks, self.schedule_order)
00138             return msg
00139 
00140     def pub_scheduled_tasks(self):
00141         self.pub_scheduled.publish(self.get_scheduled_tasks())
00142 
00143     def start_task(self, run_id):
00144         with self.lock:
00145             if not run_id in self.scheduled_tasks:
00146                 rospy.logerr('Cannot start task with run_id %s, no task exists with that run_id'%run_id)
00147                 return False
00148             if self.launch:
00149                 rospy.logerr('Cannot start task with run_id %s, another task is still running'%run_id)
00150                 return False
00151             if self.schedule_order[0] != run_id:
00152                 rospy.logerr('Cannot start task with run_id %s, this is not the first task in the schedule'%run_id)
00153                 return False
00154             with open(self.scheduled_tasks[run_id].launch_file) as f:
00155                 rospy.loginfo('Starting task with run_id %s'%run_id)
00156                 self.launch = roslauncher.ScriptRoslaunch(f.read())
00157                 self.launch.start()
00158                 self.scheduled_tasks[run_id].state = 'Running'
00159                 self.pub_scheduled_tasks()
00160                 return True
00161             return False
00162 
00163     def stop_task(self, run_id):
00164         with self.lock:
00165             if not run_id in self.scheduled_tasks:
00166                 rospy.logerr('Cannot stop task with run_id %s, no task exists with that run_id'%run_id)                
00167                 return False
00168             if not self.scheduled_tasks[run_id].state == 'Running':
00169                 rospy.logerr('Cannot stop task with run_id %s, task is not running'%run_id)                                
00170                 return False
00171             rospy.loginfo('Stopped task with run_id %s'%run_id)
00172             self.launch.shutdown()
00173             self.launch = None
00174             self.scheduled_tasks[run_id].state = 'Stopped'
00175             self.pub_scheduled_tasks()
00176             return True
00177             
00178     def add_task(self, task_id):
00179         with self.lock:
00180             if not task_id in self.available_tasks:
00181                 rospy.logerr('Cannot add task with task_id %s, because no task exists with that task_id'%task_id)
00182                 return ''
00183             run_id = ('_'.join([task_id,str(rospy.Time.now())])).replace('.','_')
00184             rospy.loginfo('Adding new task with run_id %s'%run_id)
00185             task = copy.deepcopy(self.available_tasks[task_id])
00186             task.state = 'Queued'
00187             task.run_id = run_id
00188             self.scheduled_tasks[task.run_id] = task
00189             self.schedule_order.append(task.run_id)
00190             self.pub_scheduled_tasks()
00191             return task.run_id
00192 
00193     def remove_task(self, run_id):
00194         with self.lock:
00195             if not run_id in self.scheduled_tasks:
00196                 rospy.logerr('Cannot remove task with run_id %s, no task exists with that run_id'%run_id)                
00197                 return False
00198             if self.scheduled_tasks[run_id].state == 'Running':
00199                 rospy.logerr('Cannot remove task with run_id %s, this task is currently running'%run_id)
00200                 return False
00201             rospy.loginfo('Removed task with run_id %s'%run_id)
00202             del self.scheduled_tasks[run_id]
00203             self.schedule_order.remove(run_id)
00204             self.pub_scheduled_tasks()
00205             return True
00206 
00207     def reorder_tasks(self, run_id_list):
00208         if len(run_id_list) != len(self.schedule_order):
00209             rospy.logerr('Cannot reorder tasks. Desired list has different length than scheduled list.')
00210             return False
00211         for t in self.schedule_order:
00212             if not t in run_id_list:
00213                 rospy.logerr('Cannot reorder tasks. Task %s is not specified in desired list'%t)
00214                 return False
00215         if len(run_id_list) == 0:
00216             return True
00217         if self.scheduled_tasks[self.schedule_order[0]].state == 'Running' and self.schedule_order[0] != run_id_list[0]:
00218             rospy.logerr('Cannot reorder a running task with run_id %s'%self.schedule_order[0])
00219             return False
00220         rospy.loginfo('Reordering scheduled tasks')
00221         self.schedule_order = run_id_list
00222         self.pub_scheduled_tasks()
00223         return True
00224 
00225 
00226     
00227     def get_available_tasks_srv(self, req):
00228         msg = self.get_available_tasks()
00229         self.pub_available_tasks()
00230         return GetTaskListResponse(msg)
00231 
00232     def get_scheduled_tasks_srv(self, req):
00233         return GetTaskListResponse(self.get_scheduled_tasks())
00234 
00235     def add_task_srv(self, req):
00236         return AddTaskResponse(self.add_task(req.task_id), self.get_scheduled_tasks())
00237 
00238     def remove_task_srv(self, req):
00239         self.remove_task(req.run_id)
00240         return ChangeTaskResponse(self.get_scheduled_tasks())
00241         
00242     def start_task_srv(self, req):
00243         self.start_task_run_id = req.run_id
00244         self.start_task_needed = True
00245         while self.start_task_needed:
00246             rospy.sleep(0.1)
00247         return ChangeTaskResponse(self.get_scheduled_tasks())
00248 
00249     def stop_task_srv(self, req):
00250         self.stop_task(req.run_id)
00251         return ChangeTaskResponse(self.get_scheduled_tasks())
00252 
00253     def reorder_tasks_srv(self, req):
00254         self.reorder_tasks(req.run_id_list)
00255         return ReorderTaskListResponse(self.get_scheduled_tasks())
00256 
00257 
00258 
00259 def main():
00260     rospy.init_node('task_manager_node')
00261     tm = TaskManager()
00262 
00263     
00264 
00265 
00266 
00267 
00268     while not rospy.is_shutdown():
00269         if tm.start_task_needed:
00270             print 'STARTING THREAD'
00271             tm.start_task(tm.start_task_run_id)
00272             tm.start_task_needed = False
00273         rospy.sleep(0.1)
00274 
00275 if __name__ == '__main__':
00276     main()