ROSTask.cpp
Go to the documentation of this file.
00001 #include <ros/ros.h>
00002 #include <actionlib/client/simple_action_client.h>
00003 #include <actionlib/client/terminal_state.h>
00004 
00005 #include <robot_task/RobotTask.h>
00006 #include <robot_task/RobotTaskAction.h>
00007 
00008 #include <boost/filesystem.hpp>
00009 
00010 #include "ROSTask.h"
00011 
00012 using namespace std;
00013 using namespace robot_task;
00014 
00015 #include <fstream>
00016 
00017 namespace {
00018 
00019         ofstream f1("/tmp/f1");
00020         ofstream f2("/tmp/f2");
00021         ofstream f3("/tmp/f3");
00022         long f1counter=0;
00023         long f2counter=0;
00024         long f3counter=0;
00025 }
00026 
00027 
00028 namespace decision_making{
00029 
00030 typedef actionlib::SimpleActionClient<RobotTaskAction> Client;
00031 
00032 const double WAIT_SERVER_DURATION = 1.0;
00033 const double WAIT_RESULT_DURATION = 1.0;
00034 
00035 #define CHECK_FOR_TERMINATION \
00036                 if( events.isTerminated() or not ros::ok() ){\
00037                         DMDEBUG( cout<<" TASK("<<task_address<<":TERMINATED) " ; )\
00038                         RosDiagnostic::get().publish(call_ctx.str(), "TASK", "stopped", str(TaskResult::TERMINATED()));\
00039                         return TaskResult::TERMINATED();\
00040                 }
00041 
00042 
00043 TaskResult callTask(std::string task_address, const CallContext& call_ctx, EventQueue& events);
00044 
00045 class RosNodeInformation{
00046         RosNodeInformation():run_id(uid()){}
00047         static char hex(int i){ if(i<10) return '0'+i; return 'A'+(i-10); }
00048         static string uid(){
00049                 srand(time(NULL));
00050                 stringstream s;
00051                 for(int i=0;i<4;i++){ for(int i=0;i<5;i++) s<<hex(rand()%16); s<<'-';} for(int i=0;i<5;i++) s<<hex(rand()%10);
00052                 return s.str();
00053         }
00054 public:
00055         static RosNodeInformation& get(){ static RosNodeInformation node; return node; }
00056         std::string executable_path;
00057         std::string executable_dir;
00058         std::string node_name;
00059         std::string node_namespace;
00060         std::string run_id;
00061 };
00062 
00063 class RosDiagnostic{
00064 public:
00065         static RosDiagnostic& get(){static RosDiagnostic d; return d;}
00066         typedef void (*update_t)();
00067         struct DiagnosticMessage{
00068                 string name;
00069                 string type;
00070                 string status;
00071                 string info;
00072                 DiagnosticMessage(){}
00073                 DiagnosticMessage(
00074                         string name,
00075                         string type,
00076                         string status,
00077                         string info
00078                 ):
00079                                 name(name),
00080                                 type(type),
00081                                 status(status),
00082                                 info(info)
00083                 {}
00084                 string str()const{
00085                         stringstream s;
00086                         s<<"D["<<name<<":"<<type<<":"<<status<<":"<<info<<"]";
00087                         return s.str();
00088                 }
00089 
00090                 diagnostic_msgs::DiagnosticStatus::Ptr getDiagnosticStatus()const{
00091                         diagnostic_updater::DiagnosticStatusWrapper* ret_ptr = new diagnostic_updater::DiagnosticStatusWrapper();
00092                         diagnostic_msgs::DiagnosticStatus::Ptr ret( ret_ptr );
00093                         diagnostic_updater::DiagnosticStatusWrapper& stat = * ret_ptr;
00094                         const DiagnosticMessage& msg = *this;
00095                         RosNodeInformation& info = RosNodeInformation::get();
00096 
00097                         stat.hardware_id = "none";
00098                         stat.name = info.node_name.substr(1)+": "+"decision_making";
00099 
00100                         stringstream summery;
00101                         summery<<"["<<f3counter<<"] "<<msg.name<<" "<<msg.type<<" is "<<msg.status<<". "<<msg.info;
00102                         stat.summary(diagnostic_msgs::DiagnosticStatus::OK, summery.str().c_str());
00103 
00104                         stat.add("name", msg.name);
00105                         stat.add("type", msg.type);
00106                         stat.add("status", msg.status);
00107                         stat.add("result", msg.info);
00108                         stat.add("node_name", info.node_name);
00109                         stat.add("node_exe_file", info.executable_path);
00110                         stat.add("node_exe_dir", info.executable_dir);
00111                         stat.add("node_run_id", info.run_id);
00112 
00113                         f3<<"#"<<f3counter<<": "<<msg.name<<" "<<msg.type<<" is "<<msg.status<<". "<<msg.info<<endl;
00114                         f3counter++;
00115                         return ret;
00116                 }
00117         };
00118         void publish(
00119                         string name,
00120                         string type,
00121                         string status,
00122                         string info
00123         ){
00124                 boost::mutex::scoped_lock l(mtx);
00125                 {
00126                         boost::mutex::scoped_lock lq(mtx_queue);
00127                         DiagnosticMessage msg ( name, type, status, info );
00128                         //cout<<msg.str();
00129                         f1<<"#"<<f1counter<<": "<<msg.name<<" "<<msg.type<<" is "<<msg.status<<". "<<msg.info<<endl;
00130                         f1counter++;
00131                         queue.push_back(msg);
00132 
00133                         diagnostic_msgs::DiagnosticStatus::Ptr dg_msg = msg.getDiagnosticStatus();
00134                         diagnostic_msgs::DiagnosticArray dga_msg;
00135                         dga_msg.status.push_back(*(dg_msg.get()));
00136                         diagnostic_publisher.publish(dga_msg);
00137 
00138                 }
00139                 update();
00140                 on_new_message.notify_one();
00141         }
00142         DiagnosticMessage tryGet(bool& success){
00143                 //boost::mutex::scoped_lock l(mtx);
00144                 boost::mutex::scoped_lock lq(mtx_queue);
00145                 success = false;
00146                 if(queue.empty()) return DiagnosticMessage();
00147                 DiagnosticMessage msg = queue.front();
00148                 queue.pop_front();
00149                 success=true;
00150                 return msg;
00151         }
00152         void update(){ ros_diagnostic_updater.force_update(); }
00153         class DiagnosticTask{
00154         public:
00155                 void produce_diagnostics(diagnostic_updater::DiagnosticStatusWrapper &stat)
00156                 {
00157                         bool ok;
00158                         DiagnosticMessage msg = get().tryGet(ok);
00159                         RosNodeInformation& info = RosNodeInformation::get();
00160                         if(ok){
00161                                 stringstream summery;
00162                                 summery<<"["<<f2counter<<"] "<<msg.name<<" "<<msg.type<<" is "<<msg.status<<". "<<msg.info;
00163                                 stat.summary(diagnostic_msgs::DiagnosticStatus::OK, summery.str().c_str());
00164 
00165                                 stat.add("name", msg.name);
00166                                 stat.add("type", msg.type);
00167                                 stat.add("status", msg.status);
00168                                 stat.add("result", msg.info);
00169                                 stat.add("node_name", info.node_name);
00170                                 stat.add("node_exe_file", info.executable_path);
00171                                 stat.add("node_exe_dir", info.executable_dir);
00172                                 stat.add("node_run_id", info.run_id);
00173 
00174                                 f2<<"#"<<f2counter<<": "<<msg.name<<" "<<msg.type<<" is "<<msg.status<<". "<<msg.info<<endl;
00175                                 f2counter++;
00176                         }
00177                 }
00178         };
00179 private:
00180         boost::mutex mtx;
00181         boost::mutex mtx_queue;
00182         boost::condition_variable on_new_message;
00183         std::deque<DiagnosticMessage> queue;
00184         diagnostic_updater::Updater ros_diagnostic_updater;
00185         DiagnosticTask diagnostik_task;
00186         ros::Publisher diagnostic_publisher;
00187 
00188         RosDiagnostic(){
00189                 ros_diagnostic_updater.setHardwareID("none");
00190                 ros_diagnostic_updater.add("decision_making", &diagnostik_task, &RosDiagnostic::DiagnosticTask::produce_diagnostics);
00191 
00192                 ros::NodeHandle node;
00193                 //diagnostic_publisher = node.advertise<diagnostic_msgs::DiagnosticStatus>("/decision_making/monitoring", 100);
00194                 diagnostic_publisher = node.advertise<diagnostic_msgs::DiagnosticArray>("/decision_making/monitoring", 100);
00195         }
00196 };
00197 
00198 #define IMPL_ON_FUNCTION(NAME, TEXT) \
00199         ON_FUNCTION(NAME){\
00200                 /*cout<<"[on_fsm_start]"<<call_ctx.str()<<endl;*/\
00201                 RosDiagnostic::get().publish(call_ctx.str(), type, #TEXT, str(result));\
00202         }
00203 
00204 IMPL_ON_FUNCTION(on_fsm_start, started)
00205 IMPL_ON_FUNCTION(on_fsm_end, stopped)
00206 IMPL_ON_FUNCTION(on_fsm_state_start, started)
00207 IMPL_ON_FUNCTION(on_fsm_state_end, stopped)
00208 IMPL_ON_FUNCTION(on_bt_node_start, started)
00209 IMPL_ON_FUNCTION(on_bt_node_end, stopped)
00210 IMPL_ON_FUNCTION(on_tao_tree_start, started)
00211 IMPL_ON_FUNCTION(on_tao_tree_end, stopped)
00212 IMPL_ON_FUNCTION(on_tao_plan_start, started)
00213 IMPL_ON_FUNCTION(on_tao_plan_end, stopped)
00214 
00215 std::string RosConstraints::preproc(std::string txt)const{
00216         std::stringstream source(txt);
00217         std::stringstream result;
00218         enum states_t{ nline, normal, replace } state=nline;
00219         for(size_t i=0;i<txt.size();i++){
00220                 char c = txt[i];
00221                 switch(state){
00222                 case nline:
00223                         switch(c){
00224                         case '$': result<<"#!"; state=normal; break;
00225                         case ';': state=replace; break;
00226                         case '\n': result<<c; state=nline; break;
00227                         default: result<<c; state=normal; break;
00228                         }
00229                         break;
00230                 case normal:
00231                         switch(c){
00232                         case ';': state=replace; break;
00233                         case '\n': result<<c; state=nline; break;
00234                         default: result<<c; state=normal; break;
00235                         }
00236                         break;
00237                 case replace:
00238                         switch(c){
00239                         case ' ': result<<'\n'; state=nline; break;
00240                         case '\n': result<<';'<<c; state=nline; break;
00241                         default: result<<';'<<c; state=normal; break;
00242                         }
00243                         break;
00244                 }
00245         }
00246         return result.str();
00247 }
00248 
00249 TaskResult callTask(std::string task_address, const CallContext& gl_call_ctx, EventQueue& events){
00250         DMDEBUG( cout<<" TASK("<<task_address<<":CALL) " ;)
00251         //CallContext call_ctx(gl_call_ctx, task_address);
00252         const CallContext& call_ctx = gl_call_ctx;
00253         RosDiagnostic::get().publish(call_ctx.str(), "TASK", "started", str(decision_making::TaskResult::UNDEF()));
00254 
00255         string task_name, task_params;
00256         size_t i=task_address.find('(');
00257         if(i==string::npos) task_name = task_address;
00258         else{ task_name=task_address.substr(0,i); task_params=task_address.substr(i+1,task_address.size()-1); }
00259         string full_task_name = task_name;
00260 
00261         if(LocalTasks::registrated(task_name)){
00262                 TaskResult res = LocalTasks::call(task_name, task_address, call_ctx, events);
00263                 RosDiagnostic::get().publish(call_ctx.str(), "TASK", "stopped", str(res));
00264                 Event new_event ( Event(""+ MapResultEvent::map(task_name, res.error()), call_ctx) );
00265                 DMDEBUG( cout<<" TASK("<<task_address<<":"<<new_event<<") "; )
00266                 events.raiseEvent(new_event);
00267                 return res;
00268         }
00269 
00270         while(true)
00271         {
00272                 // create the action client
00273                 // true causes the client to spin its own thread
00274                 Client ac(full_task_name.c_str(), true);
00275 
00276                 DMDEBUG( cout<<" Task["<<full_task_name<<":Waiting for action server to start task] "; )
00277                 // wait for the action server to start
00278                 //ac.waitForServer(); //will wait for infinite time
00279 
00280                 bool serverFound = false;
00281                 while( not serverFound  and ros::ok() and not events.isTerminated() ){
00282                         ac.waitForServer(ros::Duration(WAIT_SERVER_DURATION));
00283                         if(ac.isServerConnected()){
00284                                 DMDEBUG( cout<<" Task["<<full_task_name<<": is found.] "; )
00285                                 serverFound = true;
00286                         }else{
00287                                 DMDEBUG( cout<<" Task["<<full_task_name<<": not found, retry] "; )
00288                         }
00289                 }
00290 
00291                 CHECK_FOR_TERMINATION
00292 
00293                 // send a goal to the action
00294                 RobotTaskGoal goal;
00295                         goal.name = task_name;
00296                         stringstream suid; suid<<"task_id_tested_"<<::time(NULL);
00297                         goal.uid = suid.str();
00298                         goal.parameters=task_params;
00299 
00300                 DMDEBUG( cout<<" Task["<<full_task_name<<":goal is nm="<< goal.name <<", id="<< goal.uid <<", pr="<< goal.parameters<<"] "; )
00301 
00302                 ac.sendGoal(goal);
00303                 DMDEBUG( cout<<" Task["<<full_task_name<<":goal sent. wait for result"<<"] "; )
00304 
00305                 bool finished = false;
00306                 while(not events.isTerminated() and ros::ok()){
00307 
00308                         DMDEBUG( cout<<" Task["<<full_task_name<<":Wait for "<<WAIT_RESULT_DURATION<<" sec] "; )
00309 
00310                         //wait for the action to return
00311                         bool finished_before_timeout = ac.waitForResult(ros::Duration(WAIT_RESULT_DURATION));
00312 
00313                         if (finished_before_timeout)
00314                         {
00315                                 stringstream res_str;
00316                                 actionlib::SimpleClientGoalState state = ac.getState();
00317                                 res_str<< "finished:st="<<state.toString();
00318                                 finished = true;
00319                                 if(
00320                                         state==actionlib::SimpleClientGoalState::SUCCEEDED ||
00321                                         state==actionlib::SimpleClientGoalState::PREEMPTED ||
00322                                         state==actionlib::SimpleClientGoalState::ABORTED
00323                                 ){
00324                                         res_str<<",rs="<<ac.getResult()->success;
00325                                         res_str<<",ds="<<ac.getResult()->description;
00326                                         res_str<<",pl="<<ac.getResult()->plan;
00327                                 }
00328                                 DMDEBUG( cout<<" Task["<<full_task_name<<":"<< res_str.str() <<"] "; )
00329                                 break;
00330                         }
00331                 }
00332 
00333                 if(!finished){
00334                         DMDEBUG( cout<<" Task["<<full_task_name<<":"<< "Abort task goal" <<"] "; )
00335                         ac.cancelGoal();
00336                 }
00337 
00338                 CHECK_FOR_TERMINATION
00339 
00340                 DMDEBUG( cout<<" Task["<<full_task_name<<":"<< "Result is gotten" <<"] "; )
00341 
00342                 Event new_event ( Event(""+ MapResultEvent::map(task_name, ac.getResult()->success), call_ctx) );
00343                 DMDEBUG( cout<<" TASK("<<task_address<<":"<<new_event<<") "; )
00344                 events.raiseEvent(new_event);
00345 
00346                 TaskResult res;
00347                 if(ac.getResult()->success==0){
00348                         res = TaskResult::SUCCESS();
00349                 }else{
00350                         res = TaskResult::FAIL(TaskResult::rerangeErrorCode(ac.getResult()->success), ac.getResult()->description);
00351                 }
00352                 RosDiagnostic::get().publish(call_ctx.str(), "TASK", "stopped", str(res));
00353                 return res;
00354 
00355         }
00356         RosDiagnostic::get().publish(call_ctx.str(), "TASK", "stopped", str(TaskResult::FAIL()));
00357         return TaskResult::FAIL();
00358 }
00359 
00360 RosEventQueue::RosEventQueue():decision_making::EventQueue(), do_not_publish_spin(true){
00361         std::string node_name = ros::this_node::getName();
00362         std::string topic_name = "/decision_making/"+node_name+"/events";
00363         publisher = ros::NodeHandle().advertise<std_msgs::String>(topic_name, 100);
00364         {boost::this_thread::sleep(boost::posix_time::seconds(1.0));}
00365         subscriber= ros::NodeHandle().subscribe<std_msgs::String>(topic_name, 100, &RosEventQueue::onNewEvent, this);
00366         {boost::this_thread::sleep(boost::posix_time::seconds(1.0));}
00367 }
00368 RosEventQueue::RosEventQueue(EventQueue* parent):decision_making::EventQueue(parent), do_not_publish_spin(true){
00369 }
00370 RosEventQueue::RosEventQueue(EventQueue* parent, bool isTransit):decision_making::EventQueue(parent,isTransit), do_not_publish_spin(true){
00371 }
00372 
00373 void RosEventQueue::onNewEvent(const std_msgs::String::ConstPtr& msg){
00374         //cout<<"RosEventQueue:onNewEvent: "<<msg->data<<endl;
00375         decision_making::Event e(msg->data);
00376         decision_making::EventQueue::raiseEvent(e);
00377 }
00378 void RosEventQueue::raiseEvent(const decision_making::Event& e){
00379         if( do_not_publish_spin and e.equals(Event::SPIN_EVENT()) ){
00380                 decision_making::EventQueue::raiseEvent(e);
00381                 return;
00382         }
00383         std_msgs::String::Ptr msg(new std_msgs::String());
00384         msg->data = e._name;
00385         publisher.publish(msg);
00386 }
00387 
00388 
00389 }
00390 
00391 
00392 void ros_decision_making_init(int &argc, char **argv){
00393         using namespace decision_making;
00394         RosNodeInformation& info = RosNodeInformation::get();
00395         info.node_name = ros::this_node::getName();
00396         info.node_namespace = ros::this_node::getNamespace();
00397         info.executable_path = boost::filesystem::path(argv[0]).string();
00398         info.executable_dir = boost::filesystem::current_path().string();
00399         RosDiagnostic::get();
00400         RosConstraints::getAdder();
00401         RosConstraints::getRemover();
00402         boost::this_thread::sleep(boost::posix_time::seconds(1.0));
00403 }
00404 
00405 
00406 


decision_making
Author(s):
autogenerated on Wed Aug 26 2015 11:16:53