Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "Logger.hpp"
00041 #include "ExecutionEngine.hpp"
00042 #include "base/TaskCore.hpp"
00043 #include "rtt-fwd.hpp"
00044 #include "os/MutexLock.hpp"
00045 #include "internal/MWSRQueue.hpp"
00046 #include "TaskContext.hpp"
00047 #include "internal/CatchConfig.hpp"
00048
00049 #include <boost/bind.hpp>
00050 #include <boost/ref.hpp>
00051 #include <boost/lambda/lambda.hpp>
00052 #include <boost/lambda/bind.hpp>
00053 #include <functional>
00054 #include <algorithm>
00055
00056 #define ORONUM_EE_MQUEUE_SIZE 100
00057
00058 namespace RTT
00059 {
00067 using namespace std;
00068 using namespace detail;
00069 using namespace boost;
00070
00071 ExecutionEngine::ExecutionEngine( TaskCore* owner )
00072 : taskc(owner),
00073 mqueue(new MWSRQueue<DisposableInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
00074 f_queue( new MWSRQueue<ExecutableInterface*>(ORONUM_EE_MQUEUE_SIZE) )
00075 {
00076 }
00077
00078 ExecutionEngine::~ExecutionEngine()
00079 {
00080 Logger::In in("~ExecutionEngine");
00081
00082
00083 std::vector<TaskCore*> copy = children;
00084 for (std::vector<TaskCore*>::iterator it = copy.begin(); it != copy.end();++it){
00085 (*it)->setExecutionEngine( 0 );
00086 }
00087 assert( children.empty() );
00088
00089 ExecutableInterface* foo;
00090 while ( f_queue->dequeue( foo ) )
00091 foo->unloaded();
00092
00093 DisposableInterface* dis;
00094 while ( mqueue->dequeue( dis ) )
00095 dis->dispose();
00096
00097 delete f_queue;
00098 delete mqueue;
00099 }
00100
00101 TaskCore* ExecutionEngine::getParent() {
00102 return taskc;
00103 }
00104
00105 void ExecutionEngine::addChild(TaskCore* tc) {
00106 children.push_back( tc );
00107 }
00108
00109 void ExecutionEngine::removeChild(TaskCore* tc) {
00110 vector<TaskCore*>::iterator it = find (children.begin(), children.end(), tc );
00111 if ( it != children.end() )
00112 children.erase(it);
00113 }
00114
00115 void ExecutionEngine::processFunctions()
00116 {
00117
00118 ExecutableInterface* foo = 0;
00119 int nbr = f_queue->size();
00120
00121 while ( f_queue->dequeue(foo) ) {
00122 assert(foo);
00123 if ( foo->execute() == false ){
00124 foo->unloaded();
00125 msg_cond.broadcast();
00126 } else {
00127 f_queue->enqueue( foo );
00128 }
00129 if ( --nbr == 0)
00130 break;
00131 }
00132 }
00133
00134 bool ExecutionEngine::runFunction( ExecutableInterface* f )
00135 {
00136 if (this->getActivity() && f) {
00137
00138 if (taskc && taskc->mTaskState == TaskCore::FatalError )
00139 return false;
00140 f->loaded(this);
00141 bool result = f_queue->enqueue( f );
00142
00143 this->getActivity()->trigger();
00144 return result;
00145 }
00146 return false;
00147 }
00148
00149 struct RemoveMsg : public DisposableInterface {
00150 ExecutableInterface* mf;
00151 ExecutionEngine* mee;
00152 bool found;
00153 RemoveMsg(ExecutableInterface* f, ExecutionEngine* ee)
00154 : mf(f),mee(ee), found(false) {}
00155 virtual void executeAndDispose() {
00156 mee->removeSelfFunction( mf );
00157 found = true;
00158 }
00159 virtual void dispose() {}
00160 virtual bool isError() const { return false;}
00161
00162 };
00163
00164 bool ExecutionEngine::removeFunction( ExecutableInterface* f )
00165 {
00166
00167 if ( !f )
00168 return false;
00169
00170 if ( !f->isLoaded() )
00171 return true;
00172
00173
00174 if ( getActivity() == 0 || !this->getActivity()->isActive() ) {
00175 if ( removeSelfFunction( f ) == false )
00176 return false;
00177 } else {
00178
00179 RemoveMsg rmsg(f,this);
00180 if ( this->process(&rmsg) )
00181 this->waitForMessages( ! lambda::bind(&ExecutableInterface::isLoaded, f) || lambda::bind(&RemoveMsg::found,boost::ref(rmsg)) );
00182 if (!rmsg.found)
00183 return false;
00184 }
00185
00186 f->unloaded();
00187 return true;
00188 }
00189
00190 bool ExecutionEngine::removeSelfFunction(ExecutableInterface* f )
00191 {
00192
00193 if ( !f )
00194 return false;
00195 int nbr = f_queue->size();
00196 while (nbr != 0) {
00197 ExecutableInterface* foo = 0;
00198 if ( !f_queue->dequeue(foo) )
00199 return false;
00200 if ( f == foo) {
00201 return true;
00202 }
00203 f_queue->enqueue(foo);
00204 --nbr;
00205 }
00206 return true;
00207 }
00208
00209 bool ExecutionEngine::initialize() {
00210
00211 return true;
00212 }
00213
00214 bool ExecutionEngine::hasWork()
00215 {
00216 return !mqueue->isEmpty();
00217 }
00218
00219 void ExecutionEngine::processMessages()
00220 {
00221
00222
00223 DisposableInterface* com(0);
00224 {
00225 while ( mqueue->dequeue(com) ) {
00226 assert( com );
00227 com->executeAndDispose();
00228 }
00229
00230
00231
00232
00233
00234
00235 MutexLock locker( msg_lock );
00236 }
00237 if ( com )
00238 msg_cond.broadcast();
00239 }
00240
00241 bool ExecutionEngine::process( DisposableInterface* c )
00242 {
00243 if ( c && this->getActivity() ) {
00244
00245 if (taskc && taskc->mTaskState == TaskCore::FatalError )
00246 return false;
00247 bool result = mqueue->enqueue( c );
00248 this->getActivity()->trigger();
00249 msg_cond.broadcast();
00250 return result;
00251 }
00252 return false;
00253 }
00254
00255 void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred)
00256 {
00257 if (this->getActivity()->thread()->isSelf())
00258 waitAndProcessMessages(pred);
00259 else
00260 waitForMessagesInternal(pred);
00261 }
00262
00263
00264 void ExecutionEngine::waitForFunctions(const boost::function<bool(void)>& pred)
00265 {
00266 if (this->getActivity()->thread()->isSelf())
00267 waitAndProcessFunctions(pred);
00268 else
00269 waitForMessagesInternal(pred);
00270 }
00271
00272
00273 void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred)
00274 {
00275 if ( pred() )
00276 return;
00277
00278 os::MutexLock lock(msg_lock);
00279 while (!pred()) {
00280 msg_cond.wait(msg_lock);
00281 }
00282 }
00283
00284
00285 void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred)
00286 {
00287 while ( !pred() ){
00288
00289 this->processMessages();
00290 {
00291
00292
00293 os::MutexLock lock(msg_lock);
00294 if (!pred()) {
00295 msg_cond.wait(msg_lock);
00296 } else {
00297 return;
00298 }
00299 }
00300 }
00301 }
00302
00303 void ExecutionEngine::waitAndProcessFunctions(boost::function<bool(void)> const& pred)
00304 {
00305 while ( !pred() ){
00306
00307 this->processFunctions();
00308 {
00309
00310
00311 os::MutexLock lock(msg_lock);
00312 if (!pred()) {
00313 msg_cond.wait(msg_lock);
00314 } else {
00315 return;
00316 }
00317 }
00318 }
00319 }
00320
00321 void ExecutionEngine::step() {
00322 processMessages();
00323 processFunctions();
00324 processChildren();
00325 }
00326
00327 void ExecutionEngine::processChildren() {
00328
00329 if ( taskc ) {
00330
00331 if ( taskc->mTaskState == TaskCore::Running && taskc->mTargetState == TaskCore::Running ) {
00332 TRY (
00333 taskc->prepareUpdateHook();
00334 taskc->updateHook();
00335 ) CATCH(std::exception const& e,
00336 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00337 log(Error) << " " << e.what() << endlog();
00338 taskc->exception();
00339 ) CATCH_ALL (
00340 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00341 taskc->exception();
00342 )
00343 }
00344
00345 if ( taskc->mTaskState == TaskCore::RunTimeError ) {
00346 TRY (
00347 taskc->errorHook();
00348 ) CATCH(std::exception const& e,
00349 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00350 log(Error) << " " << e.what() << endlog();
00351 taskc->exception();
00352 ) CATCH_ALL (
00353 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00354 taskc->exception();
00355 )
00356 }
00357 }
00358 if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00359
00360
00361 for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00362 if ( (*it)->mTaskState == TaskCore::Running && (*it)->mTargetState == TaskCore::Running ){
00363 TRY (
00364 (*it)->prepareUpdateHook();
00365 (*it)->updateHook();
00366 ) CATCH(std::exception const& e,
00367 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00368 log(Error) << " " << e.what() << endlog();
00369 (*it)->exception();
00370 ) CATCH_ALL (
00371 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00372 (*it)->exception();
00373 )
00374 }
00375 if ( (*it)->mTaskState == TaskCore::RunTimeError ){
00376 TRY (
00377 (*it)->errorHook();
00378 ) CATCH(std::exception const& e,
00379 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00380 log(Error) << " " << e.what() << endlog();
00381 (*it)->exception();
00382 ) CATCH_ALL (
00383 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00384 (*it)->exception();
00385 )
00386 }
00387 if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00388 }
00389 }
00390
00391 bool ExecutionEngine::breakLoop() {
00392 bool ok = true;
00393 if (taskc)
00394 ok = taskc->breakUpdateHook();
00395 for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00396 ok = (*it)->breakUpdateHook() && ok;
00397 }
00398 return ok;
00399 }
00400
00401 bool ExecutionEngine::stopTask(TaskCore* task) {
00402
00403
00404
00405 if ( getActivity() && this->getActivity()->stop() ) {
00406 this->getActivity()->start();
00407 return true;
00408 }
00409 return false;
00410 }
00411
00412 void ExecutionEngine::setExceptionTask() {
00413 std::string name;
00414 TaskContext* tc = dynamic_cast<TaskContext*>(taskc);
00415 if (tc)
00416 name = tc->getName();
00417 else if (taskc)
00418 name = "TaskCore";
00419 else
00420 name = "GlobalEngine";
00421 log(Error) << "in "<<name<<": unhandled exception in sent operation." << endlog();
00422 if(taskc)
00423 taskc->exception();
00424 }
00425
00426
00427 void ExecutionEngine::finalize() {
00428
00429 }
00430
00431 }
00432