00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 from __future__ import with_statement
00041
00042 import roslib; roslib.load_manifest('continuous_ops_task_manager')
00043 import rospy
00044 import os
00045 import threading
00046 import yaml
00047 import copy
00048 import roslauncher
00049 from continuous_ops_msgs.srv import *
00050 from continuous_ops_msgs.msg import *
00051
00052
00053 def tasks_to_msg(tasks, ids=None):
00054 task_msgs = []
00055 if ids:
00056 loop = ids
00057 else:
00058 loop = tasks.keys()
00059 for t in loop:
00060 task_msgs.append(tasks[t].to_msg())
00061 return task_msgs
00062
00063
00064
00065 class OneTask():
00066 def __init__(self, pkg, file, conf):
00067 path = roslib.packages.get_pkg_dir(pkg)
00068 self.name = conf['name']
00069 self.description = conf['description']
00070 self.task_id = ('_'.join([pkg, file])).replace('.','_')
00071 self.run_id = ''
00072 self.launch_file = os.path.join(path, conf['launch_file'])
00073 self.image_file = os.path.join(path, conf['image_file'])
00074 self.state = ''
00075 self.action_ns = conf['action_ns']
00076 self.timeout = rospy.Duration(conf['timeout'])
00077
00078 def to_msg(self):
00079 msg = Task()
00080 msg.name = self.name
00081 msg.description = self.description
00082 msg.task_id = self.task_id
00083 msg.run_id = self.run_id
00084 msg.launch_file = self.launch_file
00085 msg.image_file = self.image_file
00086 msg.state = self.state
00087 msg.action_ns = self.action_ns
00088 msg.timeout = self.timeout
00089 return msg
00090
00091
00092
00093
00094 class TaskManager():
00095 def __init__(self):
00096 self.lock = threading.RLock()
00097
00098
00099 self.pub_available = rospy.Publisher('available_tasks', Tasks, latch=True)
00100 self.pub_available_tasks()
00101
00102
00103 self.start_task_needed = False
00104 self.schedule_order = []
00105 self.scheduled_tasks = {}
00106 self.launch = None
00107 self.pub_scheduled = rospy.Publisher('scheduled_tasks', Tasks, latch=True)
00108 self.get_scheduled_tasks()
00109
00110
00111 self.srv_get_available_tasks = rospy.Service('get_available_tasks', GetTaskList, self.get_available_tasks_srv)
00112 self.srv_get_scheduled_tasks = rospy.Service('get_scheduled_tasks', GetTaskList, self.get_scheduled_tasks_srv)
00113 self.srv_add_task = rospy.Service('add_task', AddTask, self.add_task_srv)
00114 self.srv_remove_task = rospy.Service('remove_task', ChangeTask, self.remove_task_srv)
00115 self.srv_start_task = rospy.Service('start_task', ChangeTask, self.start_task_srv)
00116 self.srv_stop_task = rospy.Service('stop_task', ChangeTask, self.stop_task_srv)
00117 self.srv_reorder_tasks = rospy.Service('reorder_tasks', ReorderTaskList, self.reorder_tasks_srv)
00118
00119 def get_available_tasks(self):
00120 with self.lock:
00121 self.available_tasks = {}
00122 for pkg in roslib.rospack.rospack_depends_on_1('continuous_ops_task_manager'):
00123 path = roslib.packages.get_pkg_dir(pkg)
00124 files = os.listdir(path)
00125 for f in files:
00126 if f.partition('.')[2] == 'task':
00127 conf = yaml.load(open(os.path.join(path, f)))
00128 task = OneTask(pkg, f, conf)
00129 self.available_tasks[task.task_id] = task
00130 return tasks_to_msg(self.available_tasks)
00131
00132 def pub_available_tasks(self):
00133 self.pub_available.publish(self.get_available_tasks())
00134
00135 def get_scheduled_tasks(self):
00136 with self.lock:
00137 msg = tasks_to_msg(self.scheduled_tasks, self.schedule_order)
00138 return msg
00139
00140 def pub_scheduled_tasks(self):
00141 self.pub_scheduled.publish(self.get_scheduled_tasks())
00142
00143 def start_task(self, run_id):
00144 with self.lock:
00145 if not run_id in self.scheduled_tasks:
00146 rospy.logerr('Cannot start task with run_id %s, no task exists with that run_id'%run_id)
00147 return False
00148 if self.launch:
00149 rospy.logerr('Cannot start task with run_id %s, another task is still running'%run_id)
00150 return False
00151 if self.schedule_order[0] != run_id:
00152 rospy.logerr('Cannot start task with run_id %s, this is not the first task in the schedule'%run_id)
00153 return False
00154 with open(self.scheduled_tasks[run_id].launch_file) as f:
00155 rospy.loginfo('Starting task with run_id %s'%run_id)
00156 self.launch = roslauncher.ScriptRoslaunch(f.read())
00157 self.launch.start()
00158 self.scheduled_tasks[run_id].state = 'Running'
00159 self.pub_scheduled_tasks()
00160 return True
00161 return False
00162
00163 def stop_task(self, run_id):
00164 with self.lock:
00165 if not run_id in self.scheduled_tasks:
00166 rospy.logerr('Cannot stop task with run_id %s, no task exists with that run_id'%run_id)
00167 return False
00168 if not self.scheduled_tasks[run_id].state == 'Running':
00169 rospy.logerr('Cannot stop task with run_id %s, task is not running'%run_id)
00170 return False
00171 rospy.loginfo('Stopped task with run_id %s'%run_id)
00172 self.launch.shutdown()
00173 self.launch = None
00174 self.scheduled_tasks[run_id].state = 'Stopped'
00175 self.pub_scheduled_tasks()
00176 return True
00177
00178 def add_task(self, task_id):
00179 with self.lock:
00180 if not task_id in self.available_tasks:
00181 rospy.logerr('Cannot add task with task_id %s, because no task exists with that task_id'%task_id)
00182 return ''
00183 run_id = ('_'.join([task_id,str(rospy.Time.now())])).replace('.','_')
00184 rospy.loginfo('Adding new task with run_id %s'%run_id)
00185 task = copy.deepcopy(self.available_tasks[task_id])
00186 task.state = 'Queued'
00187 task.run_id = run_id
00188 self.scheduled_tasks[task.run_id] = task
00189 self.schedule_order.append(task.run_id)
00190 self.pub_scheduled_tasks()
00191 return task.run_id
00192
00193 def remove_task(self, run_id):
00194 with self.lock:
00195 if not run_id in self.scheduled_tasks:
00196 rospy.logerr('Cannot remove task with run_id %s, no task exists with that run_id'%run_id)
00197 return False
00198 if self.scheduled_tasks[run_id].state == 'Running':
00199 rospy.logerr('Cannot remove task with run_id %s, this task is currently running'%run_id)
00200 return False
00201 rospy.loginfo('Removed task with run_id %s'%run_id)
00202 del self.scheduled_tasks[run_id]
00203 self.schedule_order.remove(run_id)
00204 self.pub_scheduled_tasks()
00205 return True
00206
00207 def reorder_tasks(self, run_id_list):
00208 if len(run_id_list) != len(self.schedule_order):
00209 rospy.logerr('Cannot reorder tasks. Desired list has different length than scheduled list.')
00210 return False
00211 for t in self.schedule_order:
00212 if not t in run_id_list:
00213 rospy.logerr('Cannot reorder tasks. Task %s is not specified in desired list'%t)
00214 return False
00215 if len(run_id_list) == 0:
00216 return True
00217 if self.scheduled_tasks[self.schedule_order[0]].state == 'Running' and self.schedule_order[0] != run_id_list[0]:
00218 rospy.logerr('Cannot reorder a running task with run_id %s'%self.schedule_order[0])
00219 return False
00220 rospy.loginfo('Reordering scheduled tasks')
00221 self.schedule_order = run_id_list
00222 self.pub_scheduled_tasks()
00223 return True
00224
00225
00226
00227 def get_available_tasks_srv(self, req):
00228 msg = self.get_available_tasks()
00229 self.pub_available_tasks()
00230 return GetTaskListResponse(msg)
00231
00232 def get_scheduled_tasks_srv(self, req):
00233 return GetTaskListResponse(self.get_scheduled_tasks())
00234
00235 def add_task_srv(self, req):
00236 return AddTaskResponse(self.add_task(req.task_id), self.get_scheduled_tasks())
00237
00238 def remove_task_srv(self, req):
00239 self.remove_task(req.run_id)
00240 return ChangeTaskResponse(self.get_scheduled_tasks())
00241
00242 def start_task_srv(self, req):
00243 self.start_task_run_id = req.run_id
00244 self.start_task_needed = True
00245 while self.start_task_needed:
00246 rospy.sleep(0.1)
00247 return ChangeTaskResponse(self.get_scheduled_tasks())
00248
00249 def stop_task_srv(self, req):
00250 self.stop_task(req.run_id)
00251 return ChangeTaskResponse(self.get_scheduled_tasks())
00252
00253 def reorder_tasks_srv(self, req):
00254 self.reorder_tasks(req.run_id_list)
00255 return ReorderTaskListResponse(self.get_scheduled_tasks())
00256
00257
00258
00259 def main():
00260 rospy.init_node('task_manager_node')
00261 tm = TaskManager()
00262
00263
00264
00265
00266
00267
00268 while not rospy.is_shutdown():
00269 if tm.start_task_needed:
00270 print 'STARTING THREAD'
00271 tm.start_task(tm.start_task_run_id)
00272 tm.start_task_needed = False
00273 rospy.sleep(0.1)
00274
00275 if __name__ == '__main__':
00276 main()