Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "Logger.hpp"
00041 #include "ExecutionEngine.hpp"
00042 #include "base/TaskCore.hpp"
00043 #include "rtt-fwd.hpp"
00044 #include "os/MutexLock.hpp"
00045 #include "internal/MWSRQueue.hpp"
00046 #include "TaskContext.hpp"
00047
00048 #include <boost/bind.hpp>
00049 #include <boost/ref.hpp>
00050 #include <boost/lambda/lambda.hpp>
00051 #include <boost/lambda/bind.hpp>
00052 #include <functional>
00053 #include <algorithm>
00054
00055 #define ORONUM_EE_MQUEUE_SIZE 100
00056
00057 namespace RTT
00058 {
00066 using namespace std;
00067 using namespace detail;
00068 using namespace boost;
00069
00070 ExecutionEngine::ExecutionEngine( TaskCore* owner )
00071 : taskc(owner),
00072 mqueue(new MWSRQueue<DisposableInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
00073 f_queue( new MWSRQueue<ExecutableInterface*>(ORONUM_EE_MQUEUE_SIZE) )
00074 {
00075 }
00076
00077 ExecutionEngine::~ExecutionEngine()
00078 {
00079 Logger::In in("~ExecutionEngine");
00080
00081
00082 std::vector<TaskCore*> copy = children;
00083 for (std::vector<TaskCore*>::iterator it = copy.begin(); it != copy.end();++it){
00084 (*it)->setExecutionEngine( 0 );
00085 }
00086 assert( children.empty() );
00087
00088 ExecutableInterface* foo;
00089 while ( f_queue->dequeue( foo ) )
00090 foo->unloaded();
00091
00092 DisposableInterface* dis;
00093 while ( mqueue->dequeue( dis ) )
00094 dis->dispose();
00095
00096 delete f_queue;
00097 delete mqueue;
00098 }
00099
00100 TaskCore* ExecutionEngine::getParent() {
00101 return taskc;
00102 }
00103
00104 void ExecutionEngine::addChild(TaskCore* tc) {
00105 children.push_back( tc );
00106 }
00107
00108 void ExecutionEngine::removeChild(TaskCore* tc) {
00109 vector<TaskCore*>::iterator it = find (children.begin(), children.end(), tc );
00110 if ( it != children.end() )
00111 children.erase(it);
00112 }
00113
00114 void ExecutionEngine::processFunctions()
00115 {
00116
00117 ExecutableInterface* foo = 0;
00118 int nbr = f_queue->size();
00119
00120 while ( f_queue->dequeue(foo) ) {
00121 assert(foo);
00122 if ( foo->execute() == false ){
00123 foo->unloaded();
00124 msg_cond.broadcast();
00125 } else {
00126 f_queue->enqueue( foo );
00127 }
00128 if ( --nbr == 0)
00129 break;
00130 }
00131 }
00132
00133 bool ExecutionEngine::runFunction( ExecutableInterface* f )
00134 {
00135 if (this->getActivity() && f) {
00136
00137 if (taskc && taskc->mTaskState == TaskCore::FatalError )
00138 return false;
00139 f->loaded(this);
00140 bool result = f_queue->enqueue( f );
00141
00142 this->getActivity()->trigger();
00143 return result;
00144 }
00145 return false;
00146 }
00147
00148 struct RemoveMsg : public DisposableInterface {
00149 ExecutableInterface* mf;
00150 ExecutionEngine* mee;
00151 bool found;
00152 RemoveMsg(ExecutableInterface* f, ExecutionEngine* ee)
00153 : mf(f),mee(ee), found(false) {}
00154 virtual void executeAndDispose() {
00155 mee->removeSelfFunction( mf );
00156 found = true;
00157 }
00158 virtual void dispose() {}
00159 virtual bool isError() const { return false;}
00160
00161 };
00162
00163 bool ExecutionEngine::removeFunction( ExecutableInterface* f )
00164 {
00165
00166 if ( !f )
00167 return false;
00168
00169 if ( !f->isLoaded() )
00170 return true;
00171
00172
00173 if ( getActivity() == 0 || !this->getActivity()->isActive() ) {
00174 if ( removeSelfFunction( f ) == false )
00175 return false;
00176 } else {
00177
00178 RemoveMsg rmsg(f,this);
00179 if ( this->process(&rmsg) )
00180 this->waitForMessages( ! lambda::bind(&ExecutableInterface::isLoaded, f) || lambda::bind(&RemoveMsg::found,boost::ref(rmsg)) );
00181 if (!rmsg.found)
00182 return false;
00183 }
00184
00185 f->unloaded();
00186 return true;
00187 }
00188
00189 bool ExecutionEngine::removeSelfFunction(ExecutableInterface* f )
00190 {
00191
00192 if ( !f )
00193 return false;
00194 int nbr = f_queue->size();
00195 while (nbr != 0) {
00196 ExecutableInterface* foo = 0;
00197 if ( !f_queue->dequeue(foo) )
00198 return false;
00199 if ( f == foo) {
00200 return true;
00201 }
00202 f_queue->enqueue(foo);
00203 --nbr;
00204 }
00205 return true;
00206 }
00207
00208 bool ExecutionEngine::initialize() {
00209
00210 return true;
00211 }
00212
00213 bool ExecutionEngine::hasWork()
00214 {
00215 return !mqueue->isEmpty();
00216 }
00217
00218 void ExecutionEngine::processMessages()
00219 {
00220
00221
00222 DisposableInterface* com(0);
00223 {
00224 while ( mqueue->dequeue(com) ) {
00225 assert( com );
00226 com->executeAndDispose();
00227 }
00228
00229
00230
00231
00232
00233
00234 MutexLock locker( msg_lock );
00235 }
00236 if ( com )
00237 msg_cond.broadcast();
00238 }
00239
00240 bool ExecutionEngine::process( DisposableInterface* c )
00241 {
00242 if ( c && this->getActivity() ) {
00243
00244 if (taskc && taskc->mTaskState == TaskCore::FatalError )
00245 return false;
00246 bool result = mqueue->enqueue( c );
00247 this->getActivity()->trigger();
00248 msg_cond.broadcast();
00249 return result;
00250 }
00251 return false;
00252 }
00253
00254 void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred)
00255 {
00256 if (this->getActivity()->thread()->isSelf())
00257 waitAndProcessMessages(pred);
00258 else
00259 waitForMessagesInternal(pred);
00260 }
00261
00262
00263 void ExecutionEngine::waitForFunctions(const boost::function<bool(void)>& pred)
00264 {
00265 if (this->getActivity()->thread()->isSelf())
00266 waitAndProcessFunctions(pred);
00267 else
00268 waitForMessagesInternal(pred);
00269 }
00270
00271
00272 void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred)
00273 {
00274 if ( pred() )
00275 return;
00276
00277 os::MutexLock lock(msg_lock);
00278 while (!pred()) {
00279 msg_cond.wait(msg_lock);
00280 }
00281 }
00282
00283
00284 void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred)
00285 {
00286 while ( !pred() ){
00287
00288 this->processMessages();
00289 {
00290
00291
00292 os::MutexLock lock(msg_lock);
00293 if (!pred()) {
00294 msg_cond.wait(msg_lock);
00295 } else {
00296 return;
00297 }
00298 }
00299 }
00300 }
00301
00302 void ExecutionEngine::waitAndProcessFunctions(boost::function<bool(void)> const& pred)
00303 {
00304 while ( !pred() ){
00305
00306 this->processFunctions();
00307 {
00308
00309
00310 os::MutexLock lock(msg_lock);
00311 if (!pred()) {
00312 msg_cond.wait(msg_lock);
00313 } else {
00314 return;
00315 }
00316 }
00317 }
00318 }
00319
00320 void ExecutionEngine::step() {
00321 processMessages();
00322 processFunctions();
00323 processChildren();
00324 }
00325
00326 void ExecutionEngine::processChildren() {
00327
00328 if ( taskc ) {
00329
00330 if ( taskc->mTaskState == TaskCore::Running && taskc->mTargetState == TaskCore::Running ) {
00331 try {
00332 taskc->prepareUpdateHook();
00333 taskc->updateHook();
00334 } catch(std::exception const& e) {
00335 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00336 log(Error) << " " << e.what() << endlog();
00337 taskc->exception();
00338 } catch(...){
00339 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00340 taskc->exception();
00341 }
00342 }
00343
00344 if ( taskc->mTaskState == TaskCore::RunTimeError ) {
00345 try {
00346 taskc->errorHook();
00347 } catch(std::exception const& e) {
00348 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00349 log(Error) << " " << e.what() << endlog();
00350 taskc->exception();
00351 } catch(...){
00352 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00353 taskc->exception();
00354 }
00355 }
00356 }
00357 if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00358
00359
00360 for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00361 if ( (*it)->mTaskState == TaskCore::Running && (*it)->mTargetState == TaskCore::Running )
00362 try {
00363 (*it)->prepareUpdateHook();
00364 (*it)->updateHook();
00365 } catch(std::exception const& e) {
00366 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00367 log(Error) << " " << e.what() << endlog();
00368 (*it)->exception();
00369 } catch(...){
00370 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00371 (*it)->exception();
00372 }
00373 if ( (*it)->mTaskState == TaskCore::RunTimeError )
00374 try {
00375 (*it)->errorHook();
00376 } catch(std::exception const& e) {
00377 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00378 log(Error) << " " << e.what() << endlog();
00379 (*it)->exception();
00380 } catch(...){
00381 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00382 (*it)->exception();
00383 }
00384 if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00385 }
00386 }
00387
00388 bool ExecutionEngine::breakLoop() {
00389 bool ok = true;
00390 if (taskc)
00391 ok = taskc->breakUpdateHook();
00392 for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00393 ok = (*it)->breakUpdateHook() && ok;
00394 }
00395 return ok;
00396 }
00397
00398 bool ExecutionEngine::stopTask(TaskCore* task) {
00399
00400
00401
00402 if ( getActivity() && this->getActivity()->stop() ) {
00403 this->getActivity()->start();
00404 return true;
00405 }
00406 return false;
00407 }
00408
00409 void ExecutionEngine::setExceptionTask() {
00410 std::string name;
00411 TaskContext* tc = dynamic_cast<TaskContext*>(taskc);
00412 if (tc)
00413 name = tc->getName();
00414 else if (taskc)
00415 name = "TaskCore";
00416 else
00417 name = "GlobalEngine";
00418 log(Error) << "in "<<name<<": unhandled exception in sent operation." << endlog();
00419 if(taskc)
00420 taskc->exception();
00421 }
00422
00423
00424 void ExecutionEngine::finalize() {
00425
00426 }
00427
00428 }
00429