00001 #include <ros/ros.h>
00002 #include <actionlib/client/simple_action_client.h>
00003 #include <actionlib/client/terminal_state.h>
00004
00005 #include <robot_task/RobotTask.h>
00006 #include <robot_task/RobotTaskAction.h>
00007
00008 #include <boost/filesystem.hpp>
00009
00010 #include "ROSTask.h"
00011
00012 using namespace std;
00013 using namespace robot_task;
00014
00015 #include <fstream>
00016
00017 namespace {
00018
00019 ofstream f1("/tmp/f1");
00020 ofstream f2("/tmp/f2");
00021 ofstream f3("/tmp/f3");
00022 long f1counter=0;
00023 long f2counter=0;
00024 long f3counter=0;
00025 }
00026
00027
00028 namespace decision_making{
00029
00030 typedef actionlib::SimpleActionClient<RobotTaskAction> Client;
00031
00032 const double WAIT_SERVER_DURATION = 1.0;
00033 const double WAIT_RESULT_DURATION = 1.0;
00034
00035 #define CHECK_FOR_TERMINATION \
00036 if( events.isTerminated() or not ros::ok() ){\
00037 DMDEBUG( cout<<" TASK("<<task_address<<":TERMINATED) " ; )\
00038 RosDiagnostic::get().publish(call_ctx.str(), "TASK", "stopped", str(TaskResult::TERMINATED()));\
00039 return TaskResult::TERMINATED();\
00040 }
00041
00042
00043 TaskResult callTask(std::string task_address, const CallContext& call_ctx, EventQueue& events);
00044
00045 class RosNodeInformation{
00046 RosNodeInformation():run_id(uid()){}
00047 static char hex(int i){ if(i<10) return '0'+i; return 'A'+(i-10); }
00048 static string uid(){
00049 srand(time(NULL));
00050 stringstream s;
00051 for(int i=0;i<4;i++){ for(int i=0;i<5;i++) s<<hex(rand()%16); s<<'-';} for(int i=0;i<5;i++) s<<hex(rand()%10);
00052 return s.str();
00053 }
00054 public:
00055 static RosNodeInformation& get(){ static RosNodeInformation node; return node; }
00056 std::string executable_path;
00057 std::string executable_dir;
00058 std::string node_name;
00059 std::string node_namespace;
00060 std::string run_id;
00061 };
00062
00063 class RosDiagnostic{
00064 public:
00065 static RosDiagnostic& get(){static RosDiagnostic d; return d;}
00066 typedef void (*update_t)();
00067 struct DiagnosticMessage{
00068 string name;
00069 string type;
00070 string status;
00071 string info;
00072 DiagnosticMessage(){}
00073 DiagnosticMessage(
00074 string name,
00075 string type,
00076 string status,
00077 string info
00078 ):
00079 name(name),
00080 type(type),
00081 status(status),
00082 info(info)
00083 {}
00084 string str()const{
00085 stringstream s;
00086 s<<"D["<<name<<":"<<type<<":"<<status<<":"<<info<<"]";
00087 return s.str();
00088 }
00089
00090 diagnostic_msgs::DiagnosticStatus::Ptr getDiagnosticStatus()const{
00091 diagnostic_updater::DiagnosticStatusWrapper* ret_ptr = new diagnostic_updater::DiagnosticStatusWrapper();
00092 diagnostic_msgs::DiagnosticStatus::Ptr ret( ret_ptr );
00093 diagnostic_updater::DiagnosticStatusWrapper& stat = * ret_ptr;
00094 const DiagnosticMessage& msg = *this;
00095 RosNodeInformation& info = RosNodeInformation::get();
00096
00097 stat.hardware_id = "none";
00098 stat.name = info.node_name.substr(1)+": "+"decision_making";
00099
00100 stringstream summery;
00101 summery<<"["<<f3counter<<"] "<<msg.name<<" "<<msg.type<<" is "<<msg.status<<". "<<msg.info;
00102 stat.summary(diagnostic_msgs::DiagnosticStatus::OK, summery.str().c_str());
00103
00104 stat.add("name", msg.name);
00105 stat.add("type", msg.type);
00106 stat.add("status", msg.status);
00107 stat.add("result", msg.info);
00108 stat.add("node_name", info.node_name);
00109 stat.add("node_exe_file", info.executable_path);
00110 stat.add("node_exe_dir", info.executable_dir);
00111 stat.add("node_run_id", info.run_id);
00112
00113 f3<<"#"<<f3counter<<": "<<msg.name<<" "<<msg.type<<" is "<<msg.status<<". "<<msg.info<<endl;
00114 f3counter++;
00115 return ret;
00116 }
00117 };
00118 void publish(
00119 string name,
00120 string type,
00121 string status,
00122 string info
00123 ){
00124 boost::mutex::scoped_lock l(mtx);
00125 {
00126 boost::mutex::scoped_lock lq(mtx_queue);
00127 DiagnosticMessage msg ( name, type, status, info );
00128
00129 f1<<"#"<<f1counter<<": "<<msg.name<<" "<<msg.type<<" is "<<msg.status<<". "<<msg.info<<endl;
00130 f1counter++;
00131 queue.push_back(msg);
00132
00133 diagnostic_msgs::DiagnosticStatus::Ptr dg_msg = msg.getDiagnosticStatus();
00134 diagnostic_msgs::DiagnosticArray dga_msg;
00135 dga_msg.status.push_back(*(dg_msg.get()));
00136 diagnostic_publisher.publish(dga_msg);
00137
00138 }
00139 update();
00140 on_new_message.notify_one();
00141 }
00142 DiagnosticMessage tryGet(bool& success){
00143
00144 boost::mutex::scoped_lock lq(mtx_queue);
00145 success = false;
00146 if(queue.empty()) return DiagnosticMessage();
00147 DiagnosticMessage msg = queue.front();
00148 queue.pop_front();
00149 success=true;
00150 return msg;
00151 }
00152 void update(){ ros_diagnostic_updater.force_update(); }
00153 class DiagnosticTask{
00154 public:
00155 void produce_diagnostics(diagnostic_updater::DiagnosticStatusWrapper &stat)
00156 {
00157 bool ok;
00158 DiagnosticMessage msg = get().tryGet(ok);
00159 RosNodeInformation& info = RosNodeInformation::get();
00160 if(ok){
00161 stringstream summery;
00162 summery<<"["<<f2counter<<"] "<<msg.name<<" "<<msg.type<<" is "<<msg.status<<". "<<msg.info;
00163 stat.summary(diagnostic_msgs::DiagnosticStatus::OK, summery.str().c_str());
00164
00165 stat.add("name", msg.name);
00166 stat.add("type", msg.type);
00167 stat.add("status", msg.status);
00168 stat.add("result", msg.info);
00169 stat.add("node_name", info.node_name);
00170 stat.add("node_exe_file", info.executable_path);
00171 stat.add("node_exe_dir", info.executable_dir);
00172 stat.add("node_run_id", info.run_id);
00173
00174 f2<<"#"<<f2counter<<": "<<msg.name<<" "<<msg.type<<" is "<<msg.status<<". "<<msg.info<<endl;
00175 f2counter++;
00176 }
00177 }
00178 };
00179 private:
00180 boost::mutex mtx;
00181 boost::mutex mtx_queue;
00182 boost::condition_variable on_new_message;
00183 std::deque<DiagnosticMessage> queue;
00184 diagnostic_updater::Updater ros_diagnostic_updater;
00185 DiagnosticTask diagnostik_task;
00186 ros::Publisher diagnostic_publisher;
00187
00188 RosDiagnostic(){
00189 ros_diagnostic_updater.setHardwareID("none");
00190 ros_diagnostic_updater.add("decision_making", &diagnostik_task, &RosDiagnostic::DiagnosticTask::produce_diagnostics);
00191
00192 ros::NodeHandle node;
00193
00194 diagnostic_publisher = node.advertise<diagnostic_msgs::DiagnosticArray>("/decision_making/monitoring", 100);
00195 }
00196 };
00197
00198 #define IMPL_ON_FUNCTION(NAME, TEXT) \
00199 ON_FUNCTION(NAME){\
00200 \
00201 RosDiagnostic::get().publish(call_ctx.str(), type, #TEXT, str(result));\
00202 }
00203
00204 IMPL_ON_FUNCTION(on_fsm_start, started)
00205 IMPL_ON_FUNCTION(on_fsm_end, stopped)
00206 IMPL_ON_FUNCTION(on_fsm_state_start, started)
00207 IMPL_ON_FUNCTION(on_fsm_state_end, stopped)
00208 IMPL_ON_FUNCTION(on_bt_node_start, started)
00209 IMPL_ON_FUNCTION(on_bt_node_end, stopped)
00210 IMPL_ON_FUNCTION(on_tao_tree_start, started)
00211 IMPL_ON_FUNCTION(on_tao_tree_end, stopped)
00212 IMPL_ON_FUNCTION(on_tao_plan_start, started)
00213 IMPL_ON_FUNCTION(on_tao_plan_end, stopped)
00214
00215 std::string RosConstraints::preproc(std::string txt)const{
00216 std::stringstream source(txt);
00217 std::stringstream result;
00218 enum states_t{ nline, normal, replace } state=nline;
00219 for(size_t i=0;i<txt.size();i++){
00220 char c = txt[i];
00221 switch(state){
00222 case nline:
00223 switch(c){
00224 case '$': result<<"#!"; state=normal; break;
00225 case ';': state=replace; break;
00226 case '\n': result<<c; state=nline; break;
00227 default: result<<c; state=normal; break;
00228 }
00229 break;
00230 case normal:
00231 switch(c){
00232 case ';': state=replace; break;
00233 case '\n': result<<c; state=nline; break;
00234 default: result<<c; state=normal; break;
00235 }
00236 break;
00237 case replace:
00238 switch(c){
00239 case ' ': result<<'\n'; state=nline; break;
00240 case '\n': result<<';'<<c; state=nline; break;
00241 default: result<<';'<<c; state=normal; break;
00242 }
00243 break;
00244 }
00245 }
00246 return result.str();
00247 }
00248
00249 TaskResult callTask(std::string task_address, const CallContext& gl_call_ctx, EventQueue& events){
00250 DMDEBUG( cout<<" TASK("<<task_address<<":CALL) " ;)
00251
00252 const CallContext& call_ctx = gl_call_ctx;
00253 RosDiagnostic::get().publish(call_ctx.str(), "TASK", "started", str(decision_making::TaskResult::UNDEF()));
00254
00255 string task_name, task_params;
00256 size_t i=task_address.find('(');
00257 if(i==string::npos) task_name = task_address;
00258 else{ task_name=task_address.substr(0,i); task_params=task_address.substr(i+1,task_address.size()-1); }
00259 string full_task_name = task_name;
00260
00261 if(LocalTasks::registrated(task_name)){
00262 TaskResult res = LocalTasks::call(task_name, task_address, call_ctx, events);
00263 RosDiagnostic::get().publish(call_ctx.str(), "TASK", "stopped", str(res));
00264 Event new_event ( Event(""+ MapResultEvent::map(task_name, res.error()), call_ctx) );
00265 DMDEBUG( cout<<" TASK("<<task_address<<":"<<new_event<<") "; )
00266 events.raiseEvent(new_event);
00267 return res;
00268 }
00269
00270 while(true)
00271 {
00272
00273
00274 Client ac(full_task_name.c_str(), true);
00275
00276 DMDEBUG( cout<<" Task["<<full_task_name<<":Waiting for action server to start task] "; )
00277
00278
00279
00280 bool serverFound = false;
00281 while( not serverFound and ros::ok() and not events.isTerminated() ){
00282 ac.waitForServer(ros::Duration(WAIT_SERVER_DURATION));
00283 if(ac.isServerConnected()){
00284 DMDEBUG( cout<<" Task["<<full_task_name<<": is found.] "; )
00285 serverFound = true;
00286 }else{
00287 DMDEBUG( cout<<" Task["<<full_task_name<<": not found, retry] "; )
00288 }
00289 }
00290
00291 CHECK_FOR_TERMINATION
00292
00293
00294 RobotTaskGoal goal;
00295 goal.name = task_name;
00296 stringstream suid; suid<<"task_id_tested_"<<::time(NULL);
00297 goal.uid = suid.str();
00298 goal.parameters=task_params;
00299
00300 DMDEBUG( cout<<" Task["<<full_task_name<<":goal is nm="<< goal.name <<", id="<< goal.uid <<", pr="<< goal.parameters<<"] "; )
00301
00302 ac.sendGoal(goal);
00303 DMDEBUG( cout<<" Task["<<full_task_name<<":goal sent. wait for result"<<"] "; )
00304
00305 bool finished = false;
00306 while(not events.isTerminated() and ros::ok()){
00307
00308 DMDEBUG( cout<<" Task["<<full_task_name<<":Wait for "<<WAIT_RESULT_DURATION<<" sec] "; )
00309
00310
00311 bool finished_before_timeout = ac.waitForResult(ros::Duration(WAIT_RESULT_DURATION));
00312
00313 if (finished_before_timeout)
00314 {
00315 stringstream res_str;
00316 actionlib::SimpleClientGoalState state = ac.getState();
00317 res_str<< "finished:st="<<state.toString();
00318 finished = true;
00319 if(
00320 state==actionlib::SimpleClientGoalState::SUCCEEDED ||
00321 state==actionlib::SimpleClientGoalState::PREEMPTED ||
00322 state==actionlib::SimpleClientGoalState::ABORTED
00323 ){
00324 res_str<<",rs="<<ac.getResult()->success;
00325 res_str<<",ds="<<ac.getResult()->description;
00326 res_str<<",pl="<<ac.getResult()->plan;
00327 }
00328 DMDEBUG( cout<<" Task["<<full_task_name<<":"<< res_str.str() <<"] "; )
00329 break;
00330 }
00331 }
00332
00333 if(!finished){
00334 DMDEBUG( cout<<" Task["<<full_task_name<<":"<< "Abort task goal" <<"] "; )
00335 ac.cancelGoal();
00336 }
00337
00338 CHECK_FOR_TERMINATION
00339
00340 DMDEBUG( cout<<" Task["<<full_task_name<<":"<< "Result is gotten" <<"] "; )
00341
00342 Event new_event ( Event(""+ MapResultEvent::map(task_name, ac.getResult()->success), call_ctx) );
00343 DMDEBUG( cout<<" TASK("<<task_address<<":"<<new_event<<") "; )
00344 events.raiseEvent(new_event);
00345
00346 TaskResult res;
00347 if(ac.getResult()->success==0){
00348 res = TaskResult::SUCCESS();
00349 }else{
00350 res = TaskResult::FAIL(TaskResult::rerangeErrorCode(ac.getResult()->success), ac.getResult()->description);
00351 }
00352 RosDiagnostic::get().publish(call_ctx.str(), "TASK", "stopped", str(res));
00353 return res;
00354
00355 }
00356 RosDiagnostic::get().publish(call_ctx.str(), "TASK", "stopped", str(TaskResult::FAIL()));
00357 return TaskResult::FAIL();
00358 }
00359
00360 RosEventQueue::RosEventQueue():decision_making::EventQueue(), do_not_publish_spin(true){
00361 std::string node_name = ros::this_node::getName();
00362 std::string topic_name = "/decision_making/"+node_name+"/events";
00363 publisher = ros::NodeHandle().advertise<std_msgs::String>(topic_name, 100);
00364 {boost::this_thread::sleep(boost::posix_time::seconds(1.0));}
00365 subscriber= ros::NodeHandle().subscribe<std_msgs::String>(topic_name, 100, &RosEventQueue::onNewEvent, this);
00366 {boost::this_thread::sleep(boost::posix_time::seconds(1.0));}
00367 }
00368 RosEventQueue::RosEventQueue(EventQueue* parent):decision_making::EventQueue(parent), do_not_publish_spin(true){
00369 }
00370 RosEventQueue::RosEventQueue(EventQueue* parent, bool isTransit):decision_making::EventQueue(parent,isTransit), do_not_publish_spin(true){
00371 }
00372
00373 void RosEventQueue::onNewEvent(const std_msgs::String::ConstPtr& msg){
00374
00375 decision_making::Event e(msg->data);
00376 decision_making::EventQueue::raiseEvent(e);
00377 }
00378 void RosEventQueue::raiseEvent(const decision_making::Event& e){
00379 if( do_not_publish_spin and e.equals(Event::SPIN_EVENT()) ){
00380 decision_making::EventQueue::raiseEvent(e);
00381 return;
00382 }
00383 std_msgs::String::Ptr msg(new std_msgs::String());
00384 msg->data = e._name;
00385 publisher.publish(msg);
00386 }
00387
00388
00389 }
00390
00391
00392 void ros_decision_making_init(int &argc, char **argv){
00393 using namespace decision_making;
00394 RosNodeInformation& info = RosNodeInformation::get();
00395 info.node_name = ros::this_node::getName();
00396 info.node_namespace = ros::this_node::getNamespace();
00397 info.executable_path = boost::filesystem::path(argv[0]).string();
00398 info.executable_dir = boost::filesystem::current_path().string();
00399 RosDiagnostic::get();
00400 RosConstraints::getAdder();
00401 RosConstraints::getRemover();
00402 boost::this_thread::sleep(boost::posix_time::seconds(1.0));
00403 }
00404
00405
00406