multi_goal_action_server.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 # Copyright (c) 2013, CogniTeam, Ltd.
00003 # All rights reserved.
00004 #
00005 # Redistribution and use in source and binary forms, with or without
00006 # modification, are permitted provided that the following conditions are met:
00007 #
00008 #     * Redistributions of source code must retain the above copyright
00009 #       notice, this list of conditions and the following disclaimer.
00010 #     * Redistributions in binary form must reproduce the above copyright
00011 #       notice, this list of conditions and the following disclaimer in the
00012 #       documentation and/or other materials provided with the distribution.
00013 #     * Neither the name of the CogniTeam, Ltd. nor the names of its
00014 #       contributors may be used to endorse or promote products derived from
00015 #       this software without specific prior written permission.
00016 #
00017 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027 # POSSIBILITY OF SUCH DAMAGE.
00028 #
00029 # Author: Dan Erisalimchik.
00030 # Based on ROS version of simple_action_server.py by Alexander Sorokin.
00031 
00032 
00033 import roslib; roslib.load_manifest('robot_task')
00034 import rospy
00035 
00036 import threading
00037 import traceback
00038 
00039 from actionlib_msgs.msg import *
00040 
00041 from actionlib import ActionServer
00042 from actionlib.server_goal_handle import ServerGoalHandle;
00043 
00044 def nop_cb(goal_handle):
00045         pass
00046 
00047 
00048 #def createEmptyGoal():
00049         #return ServerGoalHandle();
00050 
00051 
00052 class GoalQueueItem:
00053         def __init__(self, goal):
00054                 if not isinstance(goal, GoalQueueItem):
00055                         Exception("GoalQueueItem has to handle only objects of type ServerGoalHandle, but object type is "+str(goal.__class__))
00056                 self.goal=goal#ServerGoalHandle();
00057                 self.isPreempted=False
00058                 self.thread=None
00059         def __eq__(self, other):
00060                 return (isinstance(other, self.__class__)
00061                         and self.goal == other.goal)
00062         def __ne__(self, other):
00063                 return not self.__eq__(other)
00064         def __str__(self):
00065                 s_isp = ""
00066                 if self.isPreempted: s_isp = ",Preempted"
00067                 return "MQI("+str(self.goal)+s_isp+")"
00068 
00069 
00070 class MQIterator:
00071         def __init__(self, mq, idex):
00072                 self.mq = mq;
00073                 self.idx = idex
00074         def get(self):
00075                 return self.mq[self.idx]
00076         def __eq__(self, other):
00077                 return (isinstance(other, self.__class__)
00078                         and self.mq == other.mq
00079                         and self.idx == other.idx)
00080         def __ne__(self, other):
00081                 return not self.__eq__(other)
00082         def inc(self, n=1):
00083                 self.idx+=n
00084         def dec(self, n=1):
00085                 self.idx-=n
00086         def next(self):
00087                 return MQIterator(self.mq,self.idx+1)
00088         def prev(self):
00089                 return MQIterator(self.mq,self.idx-1)
00090 
00091 class GoalQueue:
00092         def __init__(self):
00093                 self.queue = []
00094         def __len__(self):
00095                 return len(self.queue)
00096         def __getitem__(self, i):
00097                 return self.queue[i]
00098         def isEmpty(self):
00099                 return len(self.queue)==0
00100         def add(self, obj):
00101                 if isinstance(obj,ServerGoalHandle):
00102                         obj = GoalQueueItem(obj)
00103                 if not isinstance(obj, GoalQueueItem):
00104                         Exception("Type of added object is not GoalQueueItem. object type is "+str(obj.__class__))
00105                 self.queue.append(obj)
00106         def find(self, obj):
00107                 if isinstance(obj,ServerGoalHandle):
00108                         obj = GoalQueueItem(obj)
00109                 try:
00110                         #rospy.loginfo("[dan] Start search Goal : %s in %s", str(obj.goal.get_goal_id().id), str([x.goal.get_goal_id().id for x in self.queue]));
00111                         i=[x.goal.get_goal_id().id for x in self.queue].index(obj.goal.get_goal_id().id)
00112                 except ValueError, e:
00113                         #rospy.loginfo("[dan] ... not found")
00114                         return MQIterator(self,len(self.queue))
00115                 #rospy.loginfo("[dan] ... found i=%s", str(i))
00116                 return MQIterator(self,i)
00117         def begin(self):
00118                 return MQIterator(self,0)
00119         def end(self):
00120                 return MQIterator(self,len(self.queue))
00121         def remove(self, obj):
00122                 if isinstance(obj, MQIterator):
00123                         if obj!=self.end():
00124                                 self.queue[obj.idx].isPreempted=True
00125                                 del self.queue[obj.idx]
00126                 else:
00127                         self.remove(self.find(obj))
00128         def __str__(self):
00129                 s=[str(x) for x in self.queue]
00130                 return "MQ:["+",".join(s)+"]"
00131                 
00132         def moveTo(self, other):
00133                 if len(self)>0:
00134                         other.add(self.begin().get())
00135                         self.remove(self.begin())
00136                         return other.end().prev();
00137                 return other.end()
00138         
00139 if __name__ == '__main__':
00140         pass
00141 
00142 class RunExecuteCallback(threading.Thread):
00143         def __init__(self, goal, server):
00144                 threading.Thread.__init__(self)
00145                 self.goal = goal
00146                 self.server = server
00147         def run(self):
00148                 #rospy.loginfo("[dan] Run thread with goal: %s", str( self.goal.get_goal_id().id ));
00149                 try:
00150                         self.server.execute_callback(self.goal)
00151                         if self.server.is_active(self.goal):
00152                                 rospy.logwarn("Your executeCallback did not set the goal to a terminal status.  " +
00153                                                         "This is a bug in your ActionServer implementation. Fix your code!  "+
00154                                                         "For now, the ActionServer will set this goal to aborted");
00155                                 self.server.set_aborted(self.goal, None, "No terminal state was set.");
00156                 except Exception, ex:
00157                         rospy.logerr("Exception in your execute callback: %s\n%s", str(ex), traceback.format_exc())
00158                         self.server.set_aborted(self.goal, None, "Exception in execute callback: %s" % str(ex))
00159 
00160 
00161 ## @class MGActionServer
00162 ## @brief The MGActionServer
00163 ## implements a multi goal policy on top of the ActionServer class. 
00164 class MGActionServer:
00165         ## @brief Constructor for a MGActionServer
00166         ## @param name A name for the action server
00167         ## @param execute_cb Optional callback that gets called in a separate thread whenever
00168         ## a new goal is received, allowing users to have blocking callbacks.
00169         ## Adding an execute callback also deactivates the goalCallback.
00170         ## @param  auto_start A boolean value that tells the ActionServer wheteher or not to start publishing as soon as it comes up. THIS SHOULD ALWAYS BE SET TO FALSE TO AVOID RACE CONDITIONS and start() should be called after construction of the server.
00171         def __init__(self, name, ActionSpec, execute_cb = None, auto_start = True):
00172 
00173                 self.active_goals=GoalQueue()
00174                 self.new_goals=GoalQueue()
00175 
00176                 self.execute_callback = execute_cb;
00177 
00178                 self.need_to_terminate = False
00179                 self.terminate_mutex = threading.RLock();
00180                 self.lock = threading.RLock();
00181 
00182                 self.execute_condition = threading.Condition(self.lock);
00183 
00184                 if self.execute_callback:
00185                         self.execute_thread = threading.Thread(None, self.executeLoop, name="ExecuterLoop");
00186                         self.execute_thread.start();
00187                 else:
00188                         self.execute_thread = None
00189 
00190                 #create the action server
00191                 self.action_server = ActionServer(name, ActionSpec, self.internal_goal_callback,self.internal_preempt_callback,auto_start);
00192 
00193 
00194         def __del__(self):
00195                 if hasattr(self, 'execute_callback') and self.execute_callback:
00196                         with self.terminate_mutex:
00197                                 self.need_to_terminate = True;
00198 
00199                         assert(self.execute_thread);
00200                         self.execute_thread.join();
00201 
00202 
00203         def accept_new_goal(self):
00204                 if self.new_goals.isEmpty():
00205                         return None
00206                 g = self.new_goals.moveTo(self.active_goals)
00207                 g.get().isPreempted = False
00208                 g.get().goal.set_accepted("This goal has been accepted by the multi goal action server");
00209                 return g;
00210 
00211 
00212         ## @brief Allows  polling implementations to query about the availability of a new goal
00213         ## @return True if a new goal is available, false otherwise
00214         def is_new_goal_available(self):
00215                 return self.new_goals.isEmpty()==False;
00216 
00217 
00218         ## @brief Allows  polling implementations to query about preempt requests
00219         ## @return True if a preempt is requested, false otherwise
00220         def is_preempt_requested(self, goal):
00221                 with self.lock:
00222                         g = self.active_goals.find(goal)
00223                         if g!=self.active_goals.end():
00224                                 return g.get().isPreempted;
00225                         return True
00226 
00227         ## @brief Allows  polling implementations to query about the status of the current goal
00228         ## @return True if a goal is active, false otherwise
00229         def is_active(self, goal):
00230                 g = self.active_goals.find(goal)
00231                 if g==self.active_goals.end(): return False
00232                 status = g.get().goal.get_goal_status().status;
00233                 return status == actionlib_msgs.msg.GoalStatus.ACTIVE or status == actionlib_msgs.msg.GoalStatus.PREEMPTING;
00234 
00235         def requestPreemption(self, goal):
00236                 g = self.active_goals.find(goal)
00237                 if g!=self.active_goals.end():
00238                         g.get().isPreempted = True
00239                         self.execute_condition.notify();
00240                 
00241         ## @brief Sets the status of the active goal to succeeded
00242         ## @param  result An optional result to send back to any clients of the goal
00243         def set_succeeded(self, goal, result=None, text=""):
00244                 self.execute_condition.acquire();
00245                 try:
00246                         if not result:
00247                                 result=self.get_default_result();
00248                         rospy.loginfo("Setting the current goal as successed");
00249                         goal.set_succeeded(result, text);
00250                         self.requestPreemption(goal)
00251                         self.execute_condition.release();
00252                 except Exception, e:
00253                         rospy.logerr("MGActionServer.set_succeeded - exception %s",str(e))
00254                         self.execute_condition.release();
00255 
00256         ## @brief Sets the status of the active goal to aborted
00257         ## @param  result An optional result to send back to any clients of the goal
00258         def set_aborted(self, goal, result = None, text=""):
00259                 self.execute_condition.acquire();
00260                 try:
00261                         if not result:
00262                                 result=self.get_default_result();
00263                         rospy.loginfo("Setting the current goal as aborted");
00264                         goal.set_aborted(result, text);
00265                         self.requestPreemption(goal)
00266                         self.execute_condition.release();
00267                 except Exception, e:
00268                         rospy.logerr("MGActionServer.set_aborted - exception %s",str(e))
00269                         self.execute_condition.release();
00270 
00271         def get_default_result(self):
00272                 return self.action_server.ActionResultType();
00273 
00274         ## @brief Sets the status of the active goal to preempted
00275         ## @param  result An optional result to send back to any clients of the goal
00276         def set_preempted(self, goal,result=None, text=""):
00277                 self.execute_condition.acquire();
00278                 try:
00279                         if not result:
00280                                 result=self.get_default_result();
00281                         rospy.loginfo("Setting the current goal as canceled");
00282                         goal.set_canceled(result, text);
00283                         self.requestPreemption(goal)
00284                         self.execute_condition.release();
00285                 except Exception, e:
00286                         rospy.logerr("MGActionServer.set_preempted - exception %s",str(e))
00287                         self.execute_condition.release();
00288 
00289 
00290         ## @brief Explicitly start the action server, used it auto_start is set to false
00291         def start(self):
00292                 self.action_server.start();
00293 
00294 
00295         ## @brief Callback for when the ActionServer receives a new goal and passes it on
00296         def internal_goal_callback(self, goal):
00297                 self.execute_condition.acquire();
00298                 try:
00299                         rospy.loginfo("A new goal %s has been recieved by the MGActionServer",goal.get_goal_id().id);
00300                         
00301                         self.new_goals.add(goal)
00302 
00303                         self.execute_condition.notify();
00304                         self.execute_condition.release();
00305                         
00306                 except Exception, e:
00307                         rospy.logerr("MGActionServer.internal_goal_callback - exception %s",str(e))
00308                         self.excondition.release();
00309 
00310         ## @brief Callback for when the ActionServer receives a new preempt and passes it on
00311         def internal_preempt_callback(self,goal):
00312                 self.execute_condition.acquire();
00313                 try:
00314                         rospy.loginfo("A preempt for goal %s has been received by the MGActionServer",goal.get_goal_id().id);
00315                         
00316                         inq = self.new_goals.find(goal)
00317                         if inq!=self.new_goals.end():
00318                                 self.new_goals.remove(inq)
00319                                 self.execute_condition.release();
00320                                 return
00321                         
00322                         self.requestPreemption(goal)
00323                         self.execute_condition.release();
00324                 except Exception, e:
00325                         rospy.logerr("MGActionServer.internal_preempt_callback - exception %s",str(e))
00326                         self.execute_condition.release();
00327                         
00328         def removePreemptedGoals(self, threads_for_delete):
00329                 found = True
00330                 while found:
00331                         found = False
00332                         for x in self.active_goals:
00333                                 if x.isPreempted:
00334                                         self.active_goals.remove(x)
00335                                         threads_for_delete.append(x.thread)
00336                                         found=True
00337                                         break
00338                                         
00339                         
00340         ## @brief Called from a separate thread to call blocking execute calls
00341         def executeLoop(self):
00342                 loop_duration = rospy.Duration.from_sec(.1);
00343                 #loop_duration = rospy.Duration.from_sec(1.0);
00344                 threads_for_delete = []
00345 
00346                 while (not rospy.is_shutdown()):
00347                         #rospy.loginfo("[dan] MGAS: execute");
00348                         print_sum = False
00349                         
00350                         #rospy.loginfo("[dan] start remove deleted threads")
00351                         c_deleted = 0
00352                         while(len(threads_for_delete)>0):
00353                                 threads_for_delete[0].join()
00354                                 del threads_for_delete[0]
00355                                 c_deleted+=1
00356                         #rospy.loginfo("[dan] finished remove deleted threads: deleted %s",str(c_deleted))
00357                         if c_deleted>0 : print_sum = True
00358 
00359                         with self.terminate_mutex:
00360                                 if (self.need_to_terminate):
00361                                         break;
00362 
00363                         with self.execute_condition:
00364                                 
00365                                 #remove preempted goals
00366                                 #rospy.loginfo("[dan] start stopping preempted goals")
00367                                 self.removePreemptedGoals(threads_for_delete)
00368                                 #rospy.loginfo("[dan] finished stopping preempted goals: stopped %s",str(len(threads_for_delete)))
00369                                 if len(threads_for_delete)>0: print_sum=True
00370                                 
00371                                 
00372                                 #run new goals
00373                                 #rospy.loginfo("[dan] start running new goals")
00374                                 c_newtasks = 0
00375                                 while(len(self.new_goals)>0):
00376                                         goal = self.accept_new_goal()
00377                                         goal.get().thread = RunExecuteCallback(goal.get().goal, self);
00378                                         goal.get().thread.start()
00379                                         c_newtasks += 1
00380                                 #rospy.loginfo("[dan] finished running new goals: started %s",str(c_newtasks))
00381                                 if c_newtasks>0: print_sum = True
00382                         
00383                                 #rospy.loginfo("[dan] there are %s active goals",str(len(self.active_goals)))
00384                                 if print_sum:
00385                                         rospy.loginfo("[ActionServer] summery: %s stopped threads, %s removed preempted goals, %s activated goals, %s active tasks",
00386                                                        str(c_deleted), str(len(threads_for_delete)), str(c_newtasks), str(len(self.active_goals)))
00387                                 
00388                                 self.execute_condition.wait(loop_duration.to_sec());
00389                                 
00390                 #rospy.loginfo("[dan] start remove deleted threads")
00391                 while(len(threads_for_delete)>0):
00392                         threads_for_delete[0].join()
00393                         del threads_for_delete[0]
00394                 #rospy.loginfo("[dan] finished remove deleted threads")
00395 
00396                 #rospy.loginfo("[dan] start remove not activated threads")
00397                 while(len(self.new_goals)>0):
00398                         del self.new_goals[0]
00399                 #rospy.loginfo("[dan] finished remove not activated threads")
00400 
00401                 #rospy.loginfo("[dan] start remove not activated threads")
00402                 while(len(self.active_goals)>0):
00403                         self.active_goals.isPreempted
00404                         self.active_goals[0].thread.join()
00405                         del self.active_goals[0]
00406                 #rospy.loginfo("[dan] finished remove not activated threads")
00407                         
00408                 #rospy.loginfo("[dan] MG Action Server DONE")
00409 


robot_task
Author(s):
autogenerated on Wed Aug 26 2015 11:16:50