$search
00001 /*************************************************************************** 00002 tag: Peter Soetens Wed Jan 18 14:11:40 CET 2006 ExecutionEngine.cxx 00003 00004 ExecutionEngine.cxx - description 00005 ------------------- 00006 begin : Wed January 18 2006 00007 copyright : (C) 2006 Peter Soetens 00008 email : peter.soetens@mech.kuleuven.be 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 00040 #include "Logger.hpp" 00041 #include "ExecutionEngine.hpp" 00042 #include "base/TaskCore.hpp" 00043 #include "rtt-fwd.hpp" 00044 #include "os/MutexLock.hpp" 00045 #include "internal/MWSRQueue.hpp" 00046 #include "TaskContext.hpp" 00047 00048 #include <boost/bind.hpp> 00049 #include <boost/ref.hpp> 00050 #include <boost/lambda/lambda.hpp> 00051 #include <boost/lambda/bind.hpp> 00052 #include <functional> 00053 #include <algorithm> 00054 00055 #define ORONUM_EE_MQUEUE_SIZE 100 00056 00057 namespace RTT 00058 { 00066 using namespace std; 00067 using namespace detail; 00068 using namespace boost; 00069 00070 ExecutionEngine::ExecutionEngine( TaskCore* owner ) 00071 : taskc(owner), 00072 mqueue(new MWSRQueue<DisposableInterface*>(ORONUM_EE_MQUEUE_SIZE) ), 00073 f_queue( new MWSRQueue<ExecutableInterface*>(ORONUM_EE_MQUEUE_SIZE) ) 00074 { 00075 } 00076 00077 ExecutionEngine::~ExecutionEngine() 00078 { 00079 Logger::In in("~ExecutionEngine"); 00080 00081 // make a copy to avoid call-back troubles: 00082 std::vector<TaskCore*> copy = children; 00083 for (std::vector<TaskCore*>::iterator it = copy.begin(); it != copy.end();++it){ 00084 (*it)->setExecutionEngine( 0 ); 00085 } 00086 assert( children.empty() ); 00087 00088 ExecutableInterface* foo; 00089 while ( f_queue->dequeue( foo ) ) 00090 foo->unloaded(); 00091 00092 DisposableInterface* dis; 00093 while ( mqueue->dequeue( dis ) ) 00094 dis->dispose(); 00095 00096 delete f_queue; 00097 delete mqueue; 00098 } 00099 00100 TaskCore* ExecutionEngine::getParent() { 00101 return taskc; 00102 } 00103 00104 void ExecutionEngine::addChild(TaskCore* tc) { 00105 children.push_back( tc ); 00106 } 00107 00108 void ExecutionEngine::removeChild(TaskCore* tc) { 00109 vector<TaskCore*>::iterator it = find (children.begin(), children.end(), tc ); 00110 if ( it != children.end() ) 00111 children.erase(it); 00112 } 00113 00114 void ExecutionEngine::processFunctions() 00115 { 00116 // Execute all loaded Functions : 00117 ExecutableInterface* foo = 0; 00118 int nbr = f_queue->size(); // nbr to process. 00119 // 1. Fetch new ones from queue. 00120 while ( f_queue->dequeue(foo) ) { 00121 assert(foo); 00122 if ( foo->execute() == false ){ 00123 foo->unloaded(); 00124 msg_cond.broadcast(); // required for waitForFunctions() (3rd party thread) 00125 } else { 00126 f_queue->enqueue( foo ); 00127 } 00128 if ( --nbr == 0) // we did a round-trip 00129 break; 00130 } 00131 } 00132 00133 bool ExecutionEngine::runFunction( ExecutableInterface* f ) 00134 { 00135 if (this->getActivity() && f) { 00136 // We only reject running functions when we're in the FatalError state. 00137 if (taskc && taskc->mTaskState == TaskCore::FatalError ) 00138 return false; 00139 f->loaded(this); 00140 bool result = f_queue->enqueue( f ); 00141 // signal work is to be done: 00142 this->getActivity()->trigger(); 00143 return result; 00144 } 00145 return false; 00146 } 00147 00148 struct RemoveMsg : public DisposableInterface { 00149 ExecutableInterface* mf; 00150 ExecutionEngine* mee; 00151 bool found; 00152 RemoveMsg(ExecutableInterface* f, ExecutionEngine* ee) 00153 : mf(f),mee(ee), found(false) {} 00154 virtual void executeAndDispose() { 00155 mee->removeSelfFunction( mf ); 00156 found = true; // always true in order to be able to quit waitForMessages. 00157 } 00158 virtual void dispose() {} 00159 virtual bool isError() const { return false;} 00160 00161 }; 00162 00163 bool ExecutionEngine::removeFunction( ExecutableInterface* f ) 00164 { 00165 // Remove from the queue. 00166 if ( !f ) 00167 return false; 00168 00169 if ( !f->isLoaded() ) 00170 return true; 00171 00172 // When not running, just remove. 00173 if ( getActivity() == 0 || !this->getActivity()->isActive() ) { 00174 if ( removeSelfFunction( f ) == false ) 00175 return false; 00176 } else { 00177 // Running: create message on stack. 00178 RemoveMsg rmsg(f,this); 00179 if ( this->process(&rmsg) ) 00180 this->waitForMessages( ! lambda::bind(&ExecutableInterface::isLoaded, f) || lambda::bind(&RemoveMsg::found,boost::ref(rmsg)) ); 00181 if (!rmsg.found) 00182 return false; 00183 } 00184 // unloading was succesful, now notify unloading: 00185 f->unloaded(); 00186 return true; 00187 } 00188 00189 bool ExecutionEngine::removeSelfFunction(ExecutableInterface* f ) 00190 { 00191 // since this function is executed in process messages, it is always safe to execute. 00192 if ( !f ) 00193 return false; 00194 int nbr = f_queue->size(); 00195 while (nbr != 0) { 00196 ExecutableInterface* foo = 0; 00197 if ( !f_queue->dequeue(foo) ) 00198 return false; 00199 if ( f == foo) { 00200 return true; 00201 } 00202 f_queue->enqueue(foo); 00203 --nbr; 00204 } 00205 return true; 00206 } 00207 00208 bool ExecutionEngine::initialize() { 00209 // nop 00210 return true; 00211 } 00212 00213 bool ExecutionEngine::hasWork() 00214 { 00215 return !mqueue->isEmpty(); 00216 } 00217 00218 void ExecutionEngine::processMessages() 00219 { 00220 // execute all commands from the AtomicQueue. 00221 // msg_lock may not be held when entering this function ! 00222 DisposableInterface* com(0); 00223 { 00224 while ( mqueue->dequeue(com) ) { 00225 assert( com ); 00226 com->executeAndDispose(); 00227 } 00228 // there's no need to hold the lock during 00229 // emptying the queue. But we must hold the 00230 // lock once between excuteAndDispose and the 00231 // broadcast to avoid the race condition in 00232 // waitForMessages(). 00233 // This allows us to recurse into processMessages. 00234 MutexLock locker( msg_lock ); 00235 } 00236 if ( com ) 00237 msg_cond.broadcast(); // required for waitForMessages() (3rd party thread) 00238 } 00239 00240 bool ExecutionEngine::process( DisposableInterface* c ) 00241 { 00242 if ( c && this->getActivity() ) { 00243 // We only reject running functions when we're in the FatalError state. 00244 if (taskc && taskc->mTaskState == TaskCore::FatalError ) 00245 return false; 00246 bool result = mqueue->enqueue( c ); 00247 this->getActivity()->trigger(); 00248 msg_cond.broadcast(); // required for waitAndProcessMessages() (EE thread) 00249 return result; 00250 } 00251 return false; 00252 } 00253 00254 void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred) 00255 { 00256 if (this->getActivity()->thread()->isSelf()) 00257 waitAndProcessMessages(pred); 00258 else 00259 waitForMessagesInternal(pred); 00260 } 00261 00262 00263 void ExecutionEngine::waitForFunctions(const boost::function<bool(void)>& pred) 00264 { 00265 if (this->getActivity()->thread()->isSelf()) 00266 waitAndProcessFunctions(pred); 00267 else 00268 waitForMessagesInternal(pred); // same as for messages. 00269 } 00270 00271 00272 void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred) 00273 { 00274 if ( pred() ) 00275 return; 00276 // only to be called from the thread not executing step(). 00277 os::MutexLock lock(msg_lock); 00278 while (!pred()) { // the mutex guards that processMessages can not run between !pred and the wait(). 00279 msg_cond.wait(msg_lock); // now processMessages may run. 00280 } 00281 } 00282 00283 00284 void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred) 00285 { 00286 while ( !pred() ){ 00287 // may not be called while holding the msg_lock !!! 00288 this->processMessages(); 00289 { 00290 // only to be called from the thread executing step(). 00291 // We must lock because the cond variable will unlock msg_lock. 00292 os::MutexLock lock(msg_lock); 00293 if (!pred()) { 00294 msg_cond.wait(msg_lock); // now processMessages may run. 00295 } else { 00296 return; // do not process messages when pred() == true; 00297 } 00298 } 00299 } 00300 } 00301 00302 void ExecutionEngine::waitAndProcessFunctions(boost::function<bool(void)> const& pred) 00303 { 00304 while ( !pred() ){ 00305 // may not be called while holding the msg_lock !!! 00306 this->processFunctions(); 00307 { 00308 // only to be called from the thread executing step(). 00309 // We must lock because the cond variable will unlock msg_lock. 00310 os::MutexLock lock(msg_lock); 00311 if (!pred()) { 00312 msg_cond.wait(msg_lock); // now processMessages may run. 00313 } else { 00314 return; // do not process messages when pred() == true; 00315 } 00316 } 00317 } 00318 } 00319 00320 void ExecutionEngine::step() { 00321 processMessages(); 00322 processFunctions(); 00323 processChildren(); // aren't these ExecutableInterfaces ie functions ? 00324 } 00325 00326 void ExecutionEngine::processChildren() { 00327 // only call updateHook in the Running state. 00328 if ( taskc ) { 00329 // A trigger() in startHook() will be ignored, we trigger in TaskCore after startHook finishes. 00330 if ( taskc->mTaskState == TaskCore::Running && taskc->mTargetState == TaskCore::Running ) { 00331 try { 00332 taskc->prepareUpdateHook(); 00333 taskc->updateHook(); 00334 } catch(std::exception const& e) { 00335 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog(); 00336 log(Error) << " " << e.what() << endlog(); 00337 taskc->exception(); 00338 } catch(...){ 00339 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog(); 00340 taskc->exception(); // calls stopHook,cleanupHook 00341 } 00342 } 00343 // in case start() or updateHook() called error(), this will be called: 00344 if ( taskc->mTaskState == TaskCore::RunTimeError ) { 00345 try { 00346 taskc->errorHook(); 00347 } catch(std::exception const& e) { 00348 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog(); 00349 log(Error) << " " << e.what() << endlog(); 00350 taskc->exception(); 00351 } catch(...){ 00352 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog(); 00353 taskc->exception(); // calls stopHook,cleanupHook 00354 } 00355 } 00356 } 00357 if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return; 00358 00359 // call all children as well. 00360 for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) { 00361 if ( (*it)->mTaskState == TaskCore::Running && (*it)->mTargetState == TaskCore::Running ) 00362 try { 00363 (*it)->prepareUpdateHook(); 00364 (*it)->updateHook(); 00365 } catch(std::exception const& e) { 00366 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog(); 00367 log(Error) << " " << e.what() << endlog(); 00368 (*it)->exception(); 00369 } catch(...){ 00370 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog(); 00371 (*it)->exception(); // calls stopHook,cleanupHook 00372 } 00373 if ( (*it)->mTaskState == TaskCore::RunTimeError ) 00374 try { 00375 (*it)->errorHook(); 00376 } catch(std::exception const& e) { 00377 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog(); 00378 log(Error) << " " << e.what() << endlog(); 00379 (*it)->exception(); 00380 } catch(...){ 00381 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog(); 00382 (*it)->exception(); // calls stopHook,cleanupHook 00383 } 00384 if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return; 00385 } 00386 } 00387 00388 bool ExecutionEngine::breakLoop() { 00389 bool ok = true; 00390 if (taskc) 00391 ok = taskc->breakUpdateHook(); 00392 for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) { 00393 ok = (*it)->breakUpdateHook() && ok; 00394 } 00395 return ok; 00396 } 00397 00398 bool ExecutionEngine::stopTask(TaskCore* task) { 00399 // stop and start where former will call breakLoop() in case of non-periodic. 00400 // this is a forced synchronization point, since stop() will only return when 00401 // step() returned. 00402 if ( getActivity() && this->getActivity()->stop() ) { 00403 this->getActivity()->start(); 00404 return true; 00405 } 00406 return false; 00407 } 00408 00409 void ExecutionEngine::setExceptionTask() { 00410 std::string name; 00411 TaskContext* tc = dynamic_cast<TaskContext*>(taskc); 00412 if (tc) 00413 name = tc->getName(); 00414 else if (taskc) 00415 name = "TaskCore"; 00416 else 00417 name = "GlobalEngine"; 00418 log(Error) << "in "<<name<<": unhandled exception in sent operation." << endlog(); 00419 if(taskc) 00420 taskc->exception(); 00421 } 00422 00423 00424 void ExecutionEngine::finalize() { 00425 // nop 00426 } 00427 00428 } 00429