00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 import roslib; roslib.load_manifest('robot_task')
00034 import rospy
00035
00036 import threading
00037 import traceback
00038
00039 from actionlib_msgs.msg import *
00040
00041 from actionlib import ActionServer
00042 from actionlib.server_goal_handle import ServerGoalHandle;
00043
00044 def nop_cb(goal_handle):
00045 pass
00046
00047
00048
00049
00050
00051
00052 class GoalQueueItem:
00053 def __init__(self, goal):
00054 if not isinstance(goal, GoalQueueItem):
00055 Exception("GoalQueueItem has to handle only objects of type ServerGoalHandle, but object type is "+str(goal.__class__))
00056 self.goal=goal
00057 self.isPreempted=False
00058 self.thread=None
00059 def __eq__(self, other):
00060 return (isinstance(other, self.__class__)
00061 and self.goal == other.goal)
00062 def __ne__(self, other):
00063 return not self.__eq__(other)
00064 def __str__(self):
00065 s_isp = ""
00066 if self.isPreempted: s_isp = ",Preempted"
00067 return "MQI("+str(self.goal)+s_isp+")"
00068
00069
00070 class MQIterator:
00071 def __init__(self, mq, idex):
00072 self.mq = mq;
00073 self.idx = idex
00074 def get(self):
00075 return self.mq[self.idx]
00076 def __eq__(self, other):
00077 return (isinstance(other, self.__class__)
00078 and self.mq == other.mq
00079 and self.idx == other.idx)
00080 def __ne__(self, other):
00081 return not self.__eq__(other)
00082 def inc(self, n=1):
00083 self.idx+=n
00084 def dec(self, n=1):
00085 self.idx-=n
00086 def next(self):
00087 return MQIterator(self.mq,self.idx+1)
00088 def prev(self):
00089 return MQIterator(self.mq,self.idx-1)
00090
00091 class GoalQueue:
00092 def __init__(self):
00093 self.queue = []
00094 def __len__(self):
00095 return len(self.queue)
00096 def __getitem__(self, i):
00097 return self.queue[i]
00098 def isEmpty(self):
00099 return len(self.queue)==0
00100 def add(self, obj):
00101 if isinstance(obj,ServerGoalHandle):
00102 obj = GoalQueueItem(obj)
00103 if not isinstance(obj, GoalQueueItem):
00104 Exception("Type of added object is not GoalQueueItem. object type is "+str(obj.__class__))
00105 self.queue.append(obj)
00106 def find(self, obj):
00107 if isinstance(obj,ServerGoalHandle):
00108 obj = GoalQueueItem(obj)
00109 try:
00110
00111 i=[x.goal.get_goal_id().id for x in self.queue].index(obj.goal.get_goal_id().id)
00112 except ValueError, e:
00113
00114 return MQIterator(self,len(self.queue))
00115
00116 return MQIterator(self,i)
00117 def begin(self):
00118 return MQIterator(self,0)
00119 def end(self):
00120 return MQIterator(self,len(self.queue))
00121 def remove(self, obj):
00122 if isinstance(obj, MQIterator):
00123 if obj!=self.end():
00124 self.queue[obj.idx].isPreempted=True
00125 del self.queue[obj.idx]
00126 else:
00127 self.remove(self.find(obj))
00128 def __str__(self):
00129 s=[str(x) for x in self.queue]
00130 return "MQ:["+",".join(s)+"]"
00131
00132 def moveTo(self, other):
00133 if len(self)>0:
00134 other.add(self.begin().get())
00135 self.remove(self.begin())
00136 return other.end().prev();
00137 return other.end()
00138
00139 if __name__ == '__main__':
00140 pass
00141
00142 class RunExecuteCallback(threading.Thread):
00143 def __init__(self, goal, server):
00144 threading.Thread.__init__(self)
00145 self.goal = goal
00146 self.server = server
00147 def run(self):
00148
00149 try:
00150 self.server.execute_callback(self.goal)
00151 if self.server.is_active(self.goal):
00152 rospy.logwarn("Your executeCallback did not set the goal to a terminal status. " +
00153 "This is a bug in your ActionServer implementation. Fix your code! "+
00154 "For now, the ActionServer will set this goal to aborted");
00155 self.server.set_aborted(self.goal, None, "No terminal state was set.");
00156 except Exception, ex:
00157 rospy.logerr("Exception in your execute callback: %s\n%s", str(ex), traceback.format_exc())
00158 self.server.set_aborted(self.goal, None, "Exception in execute callback: %s" % str(ex))
00159
00160
00161
00162
00163
00164 class MGActionServer:
00165
00166
00167
00168
00169
00170
00171 def __init__(self, name, ActionSpec, execute_cb = None, auto_start = True):
00172
00173 self.active_goals=GoalQueue()
00174 self.new_goals=GoalQueue()
00175
00176 self.execute_callback = execute_cb;
00177
00178 self.need_to_terminate = False
00179 self.terminate_mutex = threading.RLock();
00180 self.lock = threading.RLock();
00181
00182 self.execute_condition = threading.Condition(self.lock);
00183
00184 if self.execute_callback:
00185 self.execute_thread = threading.Thread(None, self.executeLoop, name="ExecuterLoop");
00186 self.execute_thread.start();
00187 else:
00188 self.execute_thread = None
00189
00190
00191 self.action_server = ActionServer(name, ActionSpec, self.internal_goal_callback,self.internal_preempt_callback,auto_start);
00192
00193
00194 def __del__(self):
00195 if hasattr(self, 'execute_callback') and self.execute_callback:
00196 with self.terminate_mutex:
00197 self.need_to_terminate = True;
00198
00199 assert(self.execute_thread);
00200 self.execute_thread.join();
00201
00202
00203 def accept_new_goal(self):
00204 if self.new_goals.isEmpty():
00205 return None
00206 g = self.new_goals.moveTo(self.active_goals)
00207 g.get().isPreempted = False
00208 g.get().goal.set_accepted("This goal has been accepted by the multi goal action server");
00209 return g;
00210
00211
00212
00213
00214 def is_new_goal_available(self):
00215 return self.new_goals.isEmpty()==False;
00216
00217
00218
00219
00220 def is_preempt_requested(self, goal):
00221 with self.lock:
00222 g = self.active_goals.find(goal)
00223 if g!=self.active_goals.end():
00224 return g.get().isPreempted;
00225 return True
00226
00227
00228
00229 def is_active(self, goal):
00230 g = self.active_goals.find(goal)
00231 if g==self.active_goals.end(): return False
00232 status = g.get().goal.get_goal_status().status;
00233 return status == actionlib_msgs.msg.GoalStatus.ACTIVE or status == actionlib_msgs.msg.GoalStatus.PREEMPTING;
00234
00235 def requestPreemption(self, goal):
00236 g = self.active_goals.find(goal)
00237 if g!=self.active_goals.end():
00238 g.get().isPreempted = True
00239 self.execute_condition.notify();
00240
00241
00242
00243 def set_succeeded(self, goal, result=None, text=""):
00244 self.execute_condition.acquire();
00245 try:
00246 if not result:
00247 result=self.get_default_result();
00248 rospy.loginfo("Setting the current goal as successed");
00249 goal.set_succeeded(result, text);
00250 self.requestPreemption(goal)
00251 self.execute_condition.release();
00252 except Exception, e:
00253 rospy.logerr("MGActionServer.set_succeeded - exception %s",str(e))
00254 self.execute_condition.release();
00255
00256
00257
00258 def set_aborted(self, goal, result = None, text=""):
00259 self.execute_condition.acquire();
00260 try:
00261 if not result:
00262 result=self.get_default_result();
00263 rospy.loginfo("Setting the current goal as aborted");
00264 goal.set_aborted(result, text);
00265 self.requestPreemption(goal)
00266 self.execute_condition.release();
00267 except Exception, e:
00268 rospy.logerr("MGActionServer.set_aborted - exception %s",str(e))
00269 self.execute_condition.release();
00270
00271 def get_default_result(self):
00272 return self.action_server.ActionResultType();
00273
00274
00275
00276 def set_preempted(self, goal,result=None, text=""):
00277 self.execute_condition.acquire();
00278 try:
00279 if not result:
00280 result=self.get_default_result();
00281 rospy.loginfo("Setting the current goal as canceled");
00282 goal.set_canceled(result, text);
00283 self.requestPreemption(goal)
00284 self.execute_condition.release();
00285 except Exception, e:
00286 rospy.logerr("MGActionServer.set_preempted - exception %s",str(e))
00287 self.execute_condition.release();
00288
00289
00290
00291 def start(self):
00292 self.action_server.start();
00293
00294
00295
00296 def internal_goal_callback(self, goal):
00297 self.execute_condition.acquire();
00298 try:
00299 rospy.loginfo("A new goal %s has been recieved by the MGActionServer",goal.get_goal_id().id);
00300
00301 self.new_goals.add(goal)
00302
00303 self.execute_condition.notify();
00304 self.execute_condition.release();
00305
00306 except Exception, e:
00307 rospy.logerr("MGActionServer.internal_goal_callback - exception %s",str(e))
00308 self.excondition.release();
00309
00310
00311 def internal_preempt_callback(self,goal):
00312 self.execute_condition.acquire();
00313 try:
00314 rospy.loginfo("A preempt for goal %s has been received by the MGActionServer",goal.get_goal_id().id);
00315
00316 inq = self.new_goals.find(goal)
00317 if inq!=self.new_goals.end():
00318 self.new_goals.remove(inq)
00319 self.execute_condition.release();
00320 return
00321
00322 self.requestPreemption(goal)
00323 self.execute_condition.release();
00324 except Exception, e:
00325 rospy.logerr("MGActionServer.internal_preempt_callback - exception %s",str(e))
00326 self.execute_condition.release();
00327
00328 def removePreemptedGoals(self, threads_for_delete):
00329 found = True
00330 while found:
00331 found = False
00332 for x in self.active_goals:
00333 if x.isPreempted:
00334 self.active_goals.remove(x)
00335 threads_for_delete.append(x.thread)
00336 found=True
00337 break
00338
00339
00340
00341 def executeLoop(self):
00342 loop_duration = rospy.Duration.from_sec(.1);
00343
00344 threads_for_delete = []
00345
00346 while (not rospy.is_shutdown()):
00347
00348 print_sum = False
00349
00350
00351 c_deleted = 0
00352 while(len(threads_for_delete)>0):
00353 threads_for_delete[0].join()
00354 del threads_for_delete[0]
00355 c_deleted+=1
00356
00357 if c_deleted>0 : print_sum = True
00358
00359 with self.terminate_mutex:
00360 if (self.need_to_terminate):
00361 break;
00362
00363 with self.execute_condition:
00364
00365
00366
00367 self.removePreemptedGoals(threads_for_delete)
00368
00369 if len(threads_for_delete)>0: print_sum=True
00370
00371
00372
00373
00374 c_newtasks = 0
00375 while(len(self.new_goals)>0):
00376 goal = self.accept_new_goal()
00377 goal.get().thread = RunExecuteCallback(goal.get().goal, self);
00378 goal.get().thread.start()
00379 c_newtasks += 1
00380
00381 if c_newtasks>0: print_sum = True
00382
00383
00384 if print_sum:
00385 rospy.loginfo("[ActionServer] summery: %s stopped threads, %s removed preempted goals, %s activated goals, %s active tasks",
00386 str(c_deleted), str(len(threads_for_delete)), str(c_newtasks), str(len(self.active_goals)))
00387
00388 self.execute_condition.wait(loop_duration.to_sec());
00389
00390
00391 while(len(threads_for_delete)>0):
00392 threads_for_delete[0].join()
00393 del threads_for_delete[0]
00394
00395
00396
00397 while(len(self.new_goals)>0):
00398 del self.new_goals[0]
00399
00400
00401
00402 while(len(self.active_goals)>0):
00403 self.active_goals.isPreempted
00404 self.active_goals[0].thread.join()
00405 del self.active_goals[0]
00406
00407
00408
00409