task_manager.py
Go to the documentation of this file.
00001 #!/usr/bin/python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of the Willow Garage nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 
00035 # Author: Wim Meeussen
00036 
00037 
00038 
00039 
00040 from __future__ import with_statement
00041 
00042 import roslib; roslib.load_manifest('continuous_ops_task_manager')
00043 import rospy
00044 import os
00045 import threading
00046 import yaml
00047 import copy
00048 import roslauncher
00049 from continuous_ops_msgs.srv import *
00050 from continuous_ops_msgs.msg import *
00051 
00052 
00053 def tasks_to_msg(tasks, ids=None):
00054     task_msgs = []
00055     if ids:
00056         loop = ids
00057     else:
00058         loop = tasks.keys()
00059     for t in loop:
00060         task_msgs.append(tasks[t].to_msg())
00061     return task_msgs
00062 
00063 
00064 
00065 class OneTask():
00066     def __init__(self, pkg, file, conf):
00067         path = roslib.packages.get_pkg_dir(pkg)
00068         self.name = conf['name']
00069         self.description = conf['description']
00070         self.task_id = ('_'.join([pkg, file])).replace('.','_')
00071         self.run_id = ''
00072         self.launch_file = os.path.join(path, conf['launch_file'])
00073         self.image_file = os.path.join(path, conf['image_file'])
00074         self.state = ''
00075         self.action_ns = conf['action_ns']
00076         self.timeout = rospy.Duration(conf['timeout'])
00077 
00078     def to_msg(self):
00079         msg = Task()
00080         msg.name = self.name
00081         msg.description = self.description
00082         msg.task_id = self.task_id
00083         msg.run_id = self.run_id
00084         msg.launch_file = self.launch_file
00085         msg.image_file = self.image_file
00086         msg.state = self.state
00087         msg.action_ns = self.action_ns
00088         msg.timeout = self.timeout
00089         return msg
00090 
00091 
00092 
00093 
00094 class TaskManager():
00095     def __init__(self):
00096         self.lock = threading.RLock()
00097 
00098         # Get list of available tasks
00099         self.pub_available = rospy.Publisher('available_tasks', Tasks, latch=True)
00100         self.pub_available_tasks()
00101 
00102         # List of scheduled tasks, store in dictionary with key run_id
00103         self.start_task_needed = False
00104         self.schedule_order = []
00105         self.scheduled_tasks = {}
00106         self.launch = None
00107         self.pub_scheduled = rospy.Publisher('scheduled_tasks', Tasks, latch=True)
00108         self.get_scheduled_tasks()
00109 
00110         # Advertise services
00111         self.srv_get_available_tasks = rospy.Service('get_available_tasks', GetTaskList, self.get_available_tasks_srv)
00112         self.srv_get_scheduled_tasks = rospy.Service('get_scheduled_tasks', GetTaskList, self.get_scheduled_tasks_srv)
00113         self.srv_add_task = rospy.Service('add_task', AddTask, self.add_task_srv)
00114         self.srv_remove_task = rospy.Service('remove_task', ChangeTask, self.remove_task_srv)
00115         self.srv_start_task = rospy.Service('start_task', ChangeTask, self.start_task_srv)
00116         self.srv_stop_task = rospy.Service('stop_task', ChangeTask, self.stop_task_srv)
00117         self.srv_reorder_tasks = rospy.Service('reorder_tasks', ReorderTaskList, self.reorder_tasks_srv)
00118 
00119     def get_available_tasks(self):
00120         with self.lock:
00121             self.available_tasks = {}
00122             for pkg in roslib.rospack.rospack_depends_on_1('continuous_ops_task_manager'):
00123                 path = roslib.packages.get_pkg_dir(pkg)
00124                 files = os.listdir(path)
00125                 for f in files:
00126                     if f.partition('.')[2] == 'task':
00127                         conf = yaml.load(open(os.path.join(path, f)))
00128                         task = OneTask(pkg, f, conf)
00129                         self.available_tasks[task.task_id] = task
00130             return tasks_to_msg(self.available_tasks)
00131 
00132     def pub_available_tasks(self):
00133         self.pub_available.publish(self.get_available_tasks())
00134 
00135     def get_scheduled_tasks(self):
00136         with self.lock:
00137             msg = tasks_to_msg(self.scheduled_tasks, self.schedule_order)
00138             return msg
00139 
00140     def pub_scheduled_tasks(self):
00141         self.pub_scheduled.publish(self.get_scheduled_tasks())
00142 
00143     def start_task(self, run_id):
00144         with self.lock:
00145             if not run_id in self.scheduled_tasks:
00146                 rospy.logerr('Cannot start task with run_id %s, no task exists with that run_id'%run_id)
00147                 return False
00148             if self.launch:
00149                 rospy.logerr('Cannot start task with run_id %s, another task is still running'%run_id)
00150                 return False
00151             if self.schedule_order[0] != run_id:
00152                 rospy.logerr('Cannot start task with run_id %s, this is not the first task in the schedule'%run_id)
00153                 return False
00154             with open(self.scheduled_tasks[run_id].launch_file) as f:
00155                 rospy.loginfo('Starting task with run_id %s'%run_id)
00156                 self.launch = roslauncher.ScriptRoslaunch(f.read())
00157                 self.launch.start()
00158                 self.scheduled_tasks[run_id].state = 'Running'
00159                 self.pub_scheduled_tasks()
00160                 return True
00161             return False
00162 
00163     def stop_task(self, run_id):
00164         with self.lock:
00165             if not run_id in self.scheduled_tasks:
00166                 rospy.logerr('Cannot stop task with run_id %s, no task exists with that run_id'%run_id)                
00167                 return False
00168             if not self.scheduled_tasks[run_id].state == 'Running':
00169                 rospy.logerr('Cannot stop task with run_id %s, task is not running'%run_id)                                
00170                 return False
00171             rospy.loginfo('Stopped task with run_id %s'%run_id)
00172             self.launch.shutdown()
00173             self.launch = None
00174             self.scheduled_tasks[run_id].state = 'Stopped'
00175             self.pub_scheduled_tasks()
00176             return True
00177             
00178     def add_task(self, task_id):
00179         with self.lock:
00180             if not task_id in self.available_tasks:
00181                 rospy.logerr('Cannot add task with task_id %s, because no task exists with that task_id'%task_id)
00182                 return ''
00183             run_id = ('_'.join([task_id,str(rospy.Time.now())])).replace('.','_')
00184             rospy.loginfo('Adding new task with run_id %s'%run_id)
00185             task = copy.deepcopy(self.available_tasks[task_id])
00186             task.state = 'Queued'
00187             task.run_id = run_id
00188             self.scheduled_tasks[task.run_id] = task
00189             self.schedule_order.append(task.run_id)
00190             self.pub_scheduled_tasks()
00191             return task.run_id
00192 
00193     def remove_task(self, run_id):
00194         with self.lock:
00195             if not run_id in self.scheduled_tasks:
00196                 rospy.logerr('Cannot remove task with run_id %s, no task exists with that run_id'%run_id)                
00197                 return False
00198             if self.scheduled_tasks[run_id].state == 'Running':
00199                 rospy.logerr('Cannot remove task with run_id %s, this task is currently running'%run_id)
00200                 return False
00201             rospy.loginfo('Removed task with run_id %s'%run_id)
00202             del self.scheduled_tasks[run_id]
00203             self.schedule_order.remove(run_id)
00204             self.pub_scheduled_tasks()
00205             return True
00206 
00207     def reorder_tasks(self, run_id_list):
00208         if len(run_id_list) != len(self.schedule_order):
00209             rospy.logerr('Cannot reorder tasks. Desired list has different length than scheduled list.')
00210             return False
00211         for t in self.schedule_order:
00212             if not t in run_id_list:
00213                 rospy.logerr('Cannot reorder tasks. Task %s is not specified in desired list'%t)
00214                 return False
00215         if len(run_id_list) == 0:
00216             return True
00217         if self.scheduled_tasks[self.schedule_order[0]].state == 'Running' and self.schedule_order[0] != run_id_list[0]:
00218             rospy.logerr('Cannot reorder a running task with run_id %s'%self.schedule_order[0])
00219             return False
00220         rospy.loginfo('Reordering scheduled tasks')
00221         self.schedule_order = run_id_list
00222         self.pub_scheduled_tasks()
00223         return True
00224 
00225 
00226     # Service callbacks
00227     def get_available_tasks_srv(self, req):
00228         msg = self.get_available_tasks()
00229         self.pub_available_tasks()
00230         return GetTaskListResponse(msg)
00231 
00232     def get_scheduled_tasks_srv(self, req):
00233         return GetTaskListResponse(self.get_scheduled_tasks())
00234 
00235     def add_task_srv(self, req):
00236         return AddTaskResponse(self.add_task(req.task_id), self.get_scheduled_tasks())
00237 
00238     def remove_task_srv(self, req):
00239         self.remove_task(req.run_id)
00240         return ChangeTaskResponse(self.get_scheduled_tasks())
00241         
00242     def start_task_srv(self, req):
00243         self.start_task_run_id = req.run_id
00244         self.start_task_needed = True
00245         while self.start_task_needed:
00246             rospy.sleep(0.1)
00247         return ChangeTaskResponse(self.get_scheduled_tasks())
00248 
00249     def stop_task_srv(self, req):
00250         self.stop_task(req.run_id)
00251         return ChangeTaskResponse(self.get_scheduled_tasks())
00252 
00253     def reorder_tasks_srv(self, req):
00254         self.reorder_tasks(req.run_id_list)
00255         return ReorderTaskListResponse(self.get_scheduled_tasks())
00256 
00257 
00258 
00259 def main():
00260     rospy.init_node('task_manager_node')
00261     tm = TaskManager()
00262 
00263     # do some testing
00264 #     tm.add_task('continuous_ops_test_task_wim_task')
00265 #     tm.add_task('continuous_ops_test_task_wim_task')
00266 #     tm.add_task('continuous_ops_test_task_eitan_task')
00267 
00268     while not rospy.is_shutdown():
00269         if tm.start_task_needed:
00270             print 'STARTING THREAD'
00271             tm.start_task(tm.start_task_run_id)
00272             tm.start_task_needed = False
00273         rospy.sleep(0.1)
00274 
00275 if __name__ == '__main__':
00276     main()


continuous_ops_task_manager
Author(s): Wim Meeussen
autogenerated on Fri Dec 6 2013 21:19:42