00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 #ifndef ACTIONLIB_MULTIGOAL_ACTION_SERVER_H_
00043 #define ACTIONLIB_MULTIGOAL_ACTION_SERVER_H_
00044
00045 #include <boost/thread/condition.hpp>
00046 #include <ros/ros.h>
00047 #include <actionlib/server/action_server.h>
00048 #include <actionlib/action_definition.h>
00049 #include <deque>
00050
00051
00052
00053 namespace actionlib {
00059 template <class ActionSpec>
00060 class MGActionServer {
00061 public:
00062
00063 ACTION_DEFINITION(ActionSpec);
00064
00065 typedef typename ActionServer<ActionSpec>::GoalHandle GoalHandle;
00066 typedef boost::function<void (GoalHandle)> ExecuteCallback;
00067
00068
00069 class GoalQueueItem{
00070 public:
00071 GoalHandle goal;
00072 bool isPreemptReq;
00073 boost::thread* thread;
00074 GoalQueueItem(GoalHandle g):goal(g),isPreemptReq(false),thread(0){}
00075
00076 ~GoalQueueItem(){
00077
00078 isPreemptReq = true;
00079 if(thread){
00080 thread=NULL;
00081 }
00082
00083 }
00084 };
00085 class GoalQueue{
00086 public:
00087 typedef typename std::deque<typename MGActionServer<ActionSpec>::GoalQueueItem>::iterator iterator;
00088 std::deque<GoalQueueItem> queue;
00089
00090 void add(GoalHandle goal){
00091 queue.push_back(GoalQueueItem(goal));
00092 }
00093 iterator find(GoalHandle goal){
00094 for(iterator i=queue.begin();i!=queue.end();i++)
00095 if(goal==i->goal) return i;
00096 return queue.end();
00097 }
00098 iterator begin(){ return queue.begin(); }
00099 iterator end(){ return queue.end(); }
00100 size_t size()const{ return queue.size(); }
00101 void remove(GoalHandle goal){
00102 iterator i = find(goal);
00103 if(i!=end()){ queue.erase(i); }
00104 }
00105 void remove(iterator i){
00106 if(i!=end()){ queue.erase(i); }
00107 }
00108 };
00109
00110
00120 MGActionServer(ros::NodeHandle n, std::string name, ExecuteCallback execute_cb, bool auto_start);
00121
00122 ~MGActionServer();
00123
00128 bool isPreemptRequested(GoalHandle goal);
00129
00134 bool isActive(GoalHandle goal);
00135
00141 void setSucceeded(GoalHandle gh, const Result& result = Result(), const std::string& text = std::string(""));
00142
00148 void setAborted(GoalHandle gh, const Result& result = Result(), const std::string& text = std::string(""));
00149
00155 void setPreempted(GoalHandle gh, const Result& result = Result(), const std::string& text = std::string(""));
00156
00157 void requestPreemption(GoalHandle gh);
00158
00162 void start();
00163
00167 void shutdown();
00168
00169
00170 void removePreemptedGoals(std::vector<boost::thread*>& thread_for_delete);
00171
00172 private:
00176 void goalCallback(GoalHandle goal);
00177
00181 void preemptCallback(GoalHandle preempt);
00182
00186 void executeLoop();
00187
00188 ros::NodeHandle n_;
00189
00190 boost::shared_ptr<ActionServer<ActionSpec> > as_;
00191
00192
00193 GoalQueue active_goals, new_goals;
00194
00195
00196
00197 boost::recursive_mutex lock_;
00198
00199 ExecuteCallback execute_callback_;
00200
00201 boost::condition execute_condition_;
00202 boost::thread* execute_thread_;
00203
00204 boost::mutex terminate_mutex_;
00205 bool need_to_terminate_;
00206 };
00207 };
00208
00209
00210
00211
00212 namespace actionlib {
00213
00214 template <class ActionSpec>
00215 MGActionServer<ActionSpec>::MGActionServer(ros::NodeHandle n, std::string name, ExecuteCallback execute_callback, bool auto_start)
00216 : n_(n), execute_callback_(execute_callback), need_to_terminate_(false) {
00217
00218
00219 as_ =
00220 boost::shared_ptr<ActionServer<ActionSpec> >(
00221 new ActionServer<ActionSpec>(
00222 n, name,
00223 boost::bind(&MGActionServer::goalCallback, this, _1),
00224 boost::bind(&MGActionServer::preemptCallback, this, _1),
00225 auto_start
00226 )
00227 );
00228
00229 if (execute_callback_ != NULL)
00230 {
00231 execute_thread_ = new boost::thread(boost::bind(&MGActionServer::executeLoop, this));
00232 }
00233 }
00234
00235
00236
00237 template <class ActionSpec>
00238 MGActionServer<ActionSpec>::~MGActionServer()
00239 {
00240 if(execute_thread_)
00241 shutdown();
00242 }
00243
00244 template <class ActionSpec>
00245 void MGActionServer<ActionSpec>::shutdown()
00246 {
00247 if (execute_callback_)
00248 {
00249 {
00250 boost::mutex::scoped_lock terminate_lock(terminate_mutex_);
00251 need_to_terminate_ = true;
00252 }
00253
00254 assert(execute_thread_);
00255 execute_thread_->join();
00256 delete execute_thread_;
00257 execute_thread_ = NULL;
00258 }
00259 }
00260
00261 template <class ActionSpec>
00262 void MGActionServer<ActionSpec>::requestPreemption(GoalHandle gh){
00263 typename GoalQueue::iterator g = active_goals.find(gh);
00264 if(g!=active_goals.end()){
00265 g->isPreemptReq=true;
00266 execute_condition_.notify_all();
00267 }
00268 }
00269
00270 template <class ActionSpec>
00271 bool MGActionServer<ActionSpec>::isPreemptRequested(GoalHandle gh){
00272 boost::recursive_mutex::scoped_lock lock(lock_);
00273 typename GoalQueue::iterator g = active_goals.find(gh);
00274 if(g!=active_goals.end()) return g->isPreemptReq;
00275 return true;
00276 }
00277
00278 template <class ActionSpec>
00279 bool MGActionServer<ActionSpec>::isActive(GoalHandle gh){
00280 typename GoalQueue::iterator g = active_goals.find(gh);
00281 if(g==active_goals.end()) return false;
00282 if(!g->goal.getGoal())
00283 return false;
00284 unsigned int status = g->goal.getGoalStatus().status;
00285 return status == actionlib_msgs::GoalStatus::ACTIVE || status == actionlib_msgs::GoalStatus::PREEMPTING;
00286 }
00287
00288 template <class ActionSpec>
00289 void MGActionServer<ActionSpec>::setSucceeded(GoalHandle gh, const Result& result, const std::string& text){
00290 boost::recursive_mutex::scoped_lock lock(lock_);
00291 ROS_DEBUG_NAMED("actionlib", "Setting the current goal as succeeded");
00292 gh.setSucceeded(result, text);
00293 requestPreemption(gh);
00294 }
00295
00296 template <class ActionSpec>
00297 void MGActionServer<ActionSpec>::setAborted(GoalHandle gh, const Result& result, const std::string& text){
00298 boost::recursive_mutex::scoped_lock lock(lock_);
00299 ROS_DEBUG_NAMED("actionlib", "Setting the current goal as aborted");
00300 gh.setAborted(result, text);
00301 requestPreemption(gh);
00302 }
00303
00304 template <class ActionSpec>
00305 void MGActionServer<ActionSpec>::setPreempted(GoalHandle gh, const Result& result, const std::string& text){
00306 boost::recursive_mutex::scoped_lock lock(lock_);
00307 ROS_DEBUG_NAMED("actionlib", "Setting the current goal as canceled");
00308 gh.setCanceled(result, text);
00309 requestPreemption(gh);
00310 }
00311
00312 template <class ActionSpec>
00313 void MGActionServer<ActionSpec>::goalCallback(GoalHandle goal){
00314 boost::recursive_mutex::scoped_lock lock(lock_);
00315 ROS_DEBUG_NAMED("actionlib", "A new goal has been recieved by the multi goal action server");
00316 ROS_INFO("actionlib: " "A new goal has been recieved by the multi goal action server");
00317 new_goals.add(goal);
00318 execute_condition_.notify_all();
00319 }
00320
00321 template <class ActionSpec>
00322 void MGActionServer<ActionSpec>::preemptCallback(GoalHandle preempt){
00323 boost::recursive_mutex::scoped_lock lock(lock_);
00324 ROS_DEBUG_NAMED("actionlib", "A preempt has been received by the MGActionServer");
00325 ROS_INFO("actionlib: " "A preempt has been received by the MGActionServer");
00326
00327
00328 typename GoalQueue::iterator i = new_goals.find(preempt);
00329 if(i!=new_goals.end()){
00330 i->goal.setCanceled(Result(), "Cancled before activation");
00331 new_goals.remove(i);
00332 std::cout<<"TaskServer: goal was preempted without activation"<<std::endl;
00333 return;
00334 }
00335
00336
00337 requestPreemption(preempt);
00338
00339
00340
00341 }
00342 template <class ActionSpec>
00343 void MGActionServer<ActionSpec>::removePreemptedGoals(std::vector<boost::thread*>& thread_for_delete){
00344 if( active_goals.size()>0 ){
00345 bool found= true;
00346
00347 while(found){
00348 found = false;
00349 for(typename GoalQueue::iterator i=active_goals.begin(); i!=active_goals.end(); i++){
00350 if(i->isPreemptReq){
00351
00352
00353 boost::thread* th = i->thread;
00354 active_goals.remove(i);
00355 thread_for_delete.push_back(th);
00356
00357
00358 found = true;
00359 break;
00360 }
00361 }
00362 }
00363
00364 }
00365 }
00366
00367 template <class ActionSpec>
00368 void MGActionServer<ActionSpec>::executeLoop(){
00369
00370 ros::Duration loop_duration = ros::Duration().fromSec(.1);
00371 std::vector<boost::thread*> thread_for_delete;
00372
00373 while (n_.ok())
00374 {
00375 while(thread_for_delete.size()>0){
00376 thread_for_delete[0]->join();
00377 delete thread_for_delete[0];
00378 thread_for_delete.erase(thread_for_delete.begin());
00379 }
00380
00381 {
00382 boost::mutex::scoped_lock terminate_lock(terminate_mutex_);
00383 if (need_to_terminate_)
00384 break;
00385 }
00386
00387 boost::recursive_mutex::scoped_lock lock(lock_);
00388
00389
00390
00391
00392 bool reportSize=false;
00393 size_t agc = active_goals.size();
00394 size_t stopped_tasks = 0;
00395 removePreemptedGoals(thread_for_delete);
00396 if( agc!=active_goals.size() ){
00397 stopped_tasks = agc-active_goals.size();
00398 reportSize=true;
00399 }
00400
00401
00402
00403
00404 size_t started_tasks=0;
00405 while(new_goals.size()>0){
00406 GoalHandle goal = new_goals.begin()->goal;
00407 new_goals.remove(new_goals.begin());
00408 active_goals.add(goal);
00409 typename GoalQueue::iterator igoal = active_goals.find(goal);
00410 goal.setAccepted("Goal activated by MGServer");
00411 igoal->thread = new boost::thread(boost::bind(execute_callback_, goal));
00412 started_tasks++;
00413 }
00414 if(started_tasks){
00415 reportSize=true;
00416 }
00417
00418 if(reportSize){
00419 ROS_INFO("TaskServer: stopped %i, started %i, active %i",
00420 (int)stopped_tasks, (int)started_tasks, (int)active_goals.size());
00421 }
00422
00423 execute_condition_.timed_wait(lock, boost::posix_time::milliseconds(loop_duration.toSec() * 1000.0f));
00424 }
00425
00426 while(new_goals.size()>0) new_goals.remove(new_goals.begin());
00427 while(active_goals.size()>0){
00428 thread_for_delete.push_back(active_goals.begin()->thread);
00429 active_goals.remove(active_goals.begin());
00430 }
00431 while(thread_for_delete.size()>0){
00432 thread_for_delete[0]->join();
00433 delete thread_for_delete[0];
00434 thread_for_delete.erase(thread_for_delete.begin());
00435 }
00436 }
00437
00438 template <class ActionSpec>
00439 void MGActionServer<ActionSpec>::start(){
00440 as_->start();
00441 }
00442
00443 };
00444
00445
00446 #endif