00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "Logger.hpp"
00041 #include "ExecutionEngine.hpp"
00042 #include "base/TaskCore.hpp"
00043 #include "rtt-fwd.hpp"
00044 #include "os/MutexLock.hpp"
00045 #include "internal/MWSRQueue.hpp"
00046
00047 #include <boost/bind.hpp>
00048 #include <boost/ref.hpp>
00049 #include <boost/lambda/lambda.hpp>
00050 #include <boost/lambda/bind.hpp>
00051 #include <functional>
00052 #include <algorithm>
00053
00054 #define ORONUM_EE_MQUEUE_SIZE 100
00055
00056 namespace RTT
00057 {
00065 using namespace std;
00066 using namespace detail;
00067 using namespace boost;
00068
00069 ExecutionEngine::ExecutionEngine( TaskCore* owner )
00070 : taskc(owner),
00071 mqueue(new MWSRQueue<DisposableInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
00072 f_queue( new MWSRQueue<ExecutableInterface*>(ORONUM_EE_MQUEUE_SIZE) )
00073 {
00074 }
00075
00076 ExecutionEngine::~ExecutionEngine()
00077 {
00078 Logger::In in("~ExecutionEngine");
00079
00080
00081 std::vector<TaskCore*> copy = children;
00082 for (std::vector<TaskCore*>::iterator it = copy.begin(); it != copy.end();++it){
00083 (*it)->setExecutionEngine( 0 );
00084 }
00085 assert( children.empty() );
00086
00087 ExecutableInterface* foo;
00088 while ( f_queue->dequeue( foo ) )
00089 foo->unloaded();
00090
00091 DisposableInterface* dis;
00092 while ( mqueue->dequeue( dis ) )
00093 dis->dispose();
00094
00095 delete f_queue;
00096 delete mqueue;
00097 }
00098
00099 TaskCore* ExecutionEngine::getParent() {
00100 return taskc;
00101 }
00102
00103 void ExecutionEngine::addChild(TaskCore* tc) {
00104 children.push_back( tc );
00105 }
00106
00107 void ExecutionEngine::removeChild(TaskCore* tc) {
00108 vector<TaskCore*>::iterator it = find (children.begin(), children.end(), tc );
00109 if ( it != children.end() )
00110 children.erase(it);
00111 }
00112
00113 void ExecutionEngine::processFunctions()
00114 {
00115
00116 ExecutableInterface* foo = 0;
00117 int nbr = f_queue->size();
00118
00119 while ( f_queue->dequeue(foo) ) {
00120 assert(foo);
00121 if ( foo->execute() == false ){
00122 foo->unloaded();
00123 msg_cond.broadcast();
00124 } else {
00125 f_queue->enqueue( foo );
00126 }
00127 if ( --nbr == 0)
00128 break;
00129 }
00130 }
00131
00132 bool ExecutionEngine::runFunction( ExecutableInterface* f )
00133 {
00134 if (this->getActivity() && f) {
00135
00136 if (taskc && taskc->mTaskState == TaskCore::FatalError )
00137 return false;
00138 f->loaded(this);
00139 bool result = f_queue->enqueue( f );
00140
00141 this->getActivity()->trigger();
00142 return result;
00143 }
00144 return false;
00145 }
00146
00147 struct RemoveMsg : public DisposableInterface {
00148 ExecutableInterface* mf;
00149 ExecutionEngine* mee;
00150 bool found;
00151 RemoveMsg(ExecutableInterface* f, ExecutionEngine* ee)
00152 : mf(f),mee(ee), found(false) {}
00153 virtual void executeAndDispose() {
00154 mee->removeSelfFunction( mf );
00155 found = true;
00156 }
00157 virtual void dispose() {}
00158
00159 };
00160
00161 bool ExecutionEngine::removeFunction( ExecutableInterface* f )
00162 {
00163
00164 if ( !f )
00165 return false;
00166
00167 if ( !f->isLoaded() )
00168 return true;
00169
00170
00171 if ( getActivity() == 0 || !this->getActivity()->isActive() ) {
00172 if ( removeSelfFunction( f ) == false )
00173 return false;
00174 } else {
00175
00176 RemoveMsg rmsg(f,this);
00177 if ( this->process(&rmsg) )
00178 this->waitForMessages( ! lambda::bind(&ExecutableInterface::isLoaded, f) || lambda::bind(&RemoveMsg::found,boost::ref(rmsg)) );
00179 if (!rmsg.found)
00180 return false;
00181 }
00182
00183 f->unloaded();
00184 return true;
00185 }
00186
00187 bool ExecutionEngine::removeSelfFunction(ExecutableInterface* f )
00188 {
00189
00190 if ( !f )
00191 return false;
00192 int nbr = f_queue->size();
00193 while (nbr != 0) {
00194 ExecutableInterface* foo = 0;
00195 if ( !f_queue->dequeue(foo) )
00196 return false;
00197 if ( f == foo) {
00198 return true;
00199 }
00200 f_queue->enqueue(foo);
00201 --nbr;
00202 }
00203 return true;
00204 }
00205
00206 bool ExecutionEngine::initialize() {
00207
00208 return true;
00209 }
00210
00211 bool ExecutionEngine::hasWork()
00212 {
00213 return !mqueue->isEmpty();
00214 }
00215
00216 void ExecutionEngine::processMessages()
00217 {
00218
00219
00220 DisposableInterface* com(0);
00221 {
00222 while ( mqueue->dequeue(com) ) {
00223 assert( com );
00224 com->executeAndDispose();
00225 }
00226
00227
00228
00229
00230
00231
00232 MutexLock locker( msg_lock );
00233 }
00234 if ( com )
00235 msg_cond.broadcast();
00236 }
00237
00238 bool ExecutionEngine::process( DisposableInterface* c )
00239 {
00240 if ( c && this->getActivity() ) {
00241
00242 if (taskc && taskc->mTaskState == TaskCore::FatalError )
00243 return false;
00244 bool result = mqueue->enqueue( c );
00245 this->getActivity()->trigger();
00246 msg_cond.broadcast();
00247 return result;
00248 }
00249 return false;
00250 }
00251
00252 void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred)
00253 {
00254 if (this->getActivity()->thread()->isSelf())
00255 waitAndProcessMessages(pred);
00256 else
00257 waitForMessagesInternal(pred);
00258 }
00259
00260
00261 void ExecutionEngine::waitForFunctions(const boost::function<bool(void)>& pred)
00262 {
00263 if (this->getActivity()->thread()->isSelf())
00264 waitAndProcessFunctions(pred);
00265 else
00266 waitForMessagesInternal(pred);
00267 }
00268
00269
00270 void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred)
00271 {
00272 if ( pred() )
00273 return;
00274
00275 os::MutexLock lock(msg_lock);
00276 while (!pred()) {
00277 msg_cond.wait(msg_lock);
00278 }
00279 }
00280
00281
00282 void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred)
00283 {
00284 while ( !pred() ){
00285
00286 this->processMessages();
00287 {
00288
00289
00290 os::MutexLock lock(msg_lock);
00291 if (!pred()) {
00292 msg_cond.wait(msg_lock);
00293 } else {
00294 return;
00295 }
00296 }
00297 }
00298 }
00299
00300 void ExecutionEngine::waitAndProcessFunctions(boost::function<bool(void)> const& pred)
00301 {
00302 while ( !pred() ){
00303
00304 this->processFunctions();
00305 {
00306
00307
00308 os::MutexLock lock(msg_lock);
00309 if (!pred()) {
00310 msg_cond.wait(msg_lock);
00311 } else {
00312 return;
00313 }
00314 }
00315 }
00316 }
00317
00318 void ExecutionEngine::step() {
00319 processMessages();
00320 processFunctions();
00321 processChildren();
00322 }
00323
00324 void ExecutionEngine::processChildren() {
00325
00326 if ( taskc ) {
00327
00328 if ( taskc->mTaskState == TaskCore::Running && taskc->mTargetState == TaskCore::Running ) {
00329 try {
00330 taskc->prepareUpdateHook();
00331 taskc->updateHook();
00332 } catch(std::exception const& e) {
00333 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00334 log(Error) << " " << e.what() << endlog();
00335 taskc->exception();
00336 } catch(...){
00337 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00338 taskc->exception();
00339 }
00340 }
00341
00342 if ( taskc->mTaskState == TaskCore::RunTimeError ) {
00343 try {
00344 taskc->errorHook();
00345 } catch(std::exception const& e) {
00346 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00347 log(Error) << " " << e.what() << endlog();
00348 taskc->exception();
00349 } catch(...){
00350 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00351 taskc->exception();
00352 }
00353 }
00354 }
00355 if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00356
00357
00358 for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00359 if ( (*it)->mTaskState == TaskCore::Running && (*it)->mTargetState == TaskCore::Running )
00360 try {
00361 (*it)->prepareUpdateHook();
00362 (*it)->updateHook();
00363 } catch(std::exception const& e) {
00364 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00365 log(Error) << " " << e.what() << endlog();
00366 (*it)->exception();
00367 } catch(...){
00368 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00369 (*it)->exception();
00370 }
00371 if ( (*it)->mTaskState == TaskCore::RunTimeError )
00372 try {
00373 (*it)->errorHook();
00374 } catch(std::exception const& e) {
00375 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00376 log(Error) << " " << e.what() << endlog();
00377 (*it)->exception();
00378 } catch(...){
00379 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00380 (*it)->exception();
00381 }
00382 if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00383 }
00384 }
00385
00386 bool ExecutionEngine::breakLoop() {
00387 bool ok = true;
00388 if (taskc)
00389 ok = taskc->breakUpdateHook();
00390 for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00391 ok = (*it)->breakUpdateHook() && ok;
00392 }
00393 return ok;
00394 }
00395
00396 bool ExecutionEngine::stopTask(TaskCore* task) {
00397
00398
00399
00400 if ( getActivity() && this->getActivity()->stop() ) {
00401 this->getActivity()->start();
00402 return true;
00403 }
00404 return false;
00405 }
00406
00407
00408 void ExecutionEngine::finalize() {
00409
00410 }
00411
00412 }
00413