ExecutionEngine.cpp
Go to the documentation of this file.
00001 /***************************************************************************
00002   tag: Peter Soetens  Wed Jan 18 14:11:40 CET 2006  ExecutionEngine.cxx
00003 
00004                         ExecutionEngine.cxx -  description
00005                            -------------------
00006     begin                : Wed January 18 2006
00007     copyright            : (C) 2006 Peter Soetens
00008     email                : peter.soetens@mech.kuleuven.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 
00040 #include "Logger.hpp"
00041 #include "ExecutionEngine.hpp"
00042 #include "base/TaskCore.hpp"
00043 #include "rtt-fwd.hpp"
00044 #include "os/MutexLock.hpp"
00045 #include "internal/MWSRQueue.hpp"
00046 #include "TaskContext.hpp"
00047 #include "internal/CatchConfig.hpp"
00048 #include "extras/SlaveActivity.hpp"
00049 
00050 #include <boost/bind.hpp>
00051 #include <boost/ref.hpp>
00052 #ifndef USE_CPP11
00053 #include <boost/lambda/lambda.hpp>
00054 #include <boost/lambda/bind.hpp>
00055 #endif
00056 #include <functional>
00057 #include <algorithm>
00058 
00059 #define ORONUM_EE_MQUEUE_SIZE 100
00060 
00061 namespace RTT
00062 {
00070     using namespace std;
00071     using namespace detail;
00072     using namespace boost;
00073 
00074     ExecutionEngine::ExecutionEngine( TaskCore* owner )
00075         : taskc(owner),
00076           mqueue(new MWSRQueue<DisposableInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
00077           f_queue( new MWSRQueue<ExecutableInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
00078           mmaster(0)
00079     {
00080     }
00081 
00082     ExecutionEngine::~ExecutionEngine()
00083     {
00084         Logger::In in("~ExecutionEngine");
00085 
00086         // make a copy to avoid call-back troubles:
00087         std::vector<TaskCore*> copy = children;
00088         for (std::vector<TaskCore*>::iterator it = copy.begin(); it != copy.end();++it){
00089             (*it)->setExecutionEngine( 0 );
00090         }
00091         assert( children.empty() );
00092 
00093         ExecutableInterface* foo;
00094         while ( f_queue->dequeue( foo ) )
00095             foo->unloaded();
00096 
00097         DisposableInterface* dis;
00098         while ( mqueue->dequeue( dis ) )
00099             dis->dispose();
00100 
00101         delete f_queue;
00102         delete mqueue;
00103     }
00104 
00105     TaskCore* ExecutionEngine::getParent() {
00106         return taskc;
00107     }
00108 
00109     void ExecutionEngine::addChild(TaskCore* tc) {
00110         children.push_back( tc );
00111     }
00112 
00113     void ExecutionEngine::removeChild(TaskCore* tc) {
00114         vector<TaskCore*>::iterator it = find (children.begin(), children.end(), tc );
00115         if ( it != children.end() )
00116             children.erase(it);
00117     }
00118 
00119     void ExecutionEngine::processFunctions()
00120     {
00121         // Execute all loaded Functions :
00122         ExecutableInterface* foo = 0;
00123         int nbr = f_queue->size(); // nbr to process.
00124         // 1. Fetch new ones from queue.
00125         while ( f_queue->dequeue(foo) ) {
00126             assert(foo);
00127             if ( foo->execute() == false ){
00128                 foo->unloaded();
00129                 msg_cond.broadcast(); // required for waitForFunctions() (3rd party thread)
00130             } else {
00131                 f_queue->enqueue( foo );
00132             }
00133             if ( --nbr == 0) // we did a round-trip
00134                 break;
00135         }
00136     }
00137 
00138     bool ExecutionEngine::runFunction( ExecutableInterface* f )
00139     {
00140         if (this->getActivity() && f) {
00141             // We only reject running functions when we're in the FatalError state.
00142             if (taskc && taskc->mTaskState == TaskCore::FatalError )
00143                 return false;
00144             f->loaded(this);
00145             bool result = f_queue->enqueue( f );
00146             // signal work is to be done:
00147             this->getActivity()->trigger();
00148             return result;
00149         }
00150         return false;
00151     }
00152 
00153     struct RemoveMsg : public DisposableInterface {
00154         ExecutableInterface* mf;
00155         ExecutionEngine* mee;
00156         bool found;
00157         RemoveMsg(ExecutableInterface* f, ExecutionEngine* ee)
00158         : mf(f),mee(ee), found(false) {}
00159         virtual void executeAndDispose() {
00160             mee->removeSelfFunction( mf );
00161             found = true; // always true in order to be able to quit waitForMessages.
00162         }
00163         virtual void dispose() {}
00164         virtual bool isError() const { return false;}
00165 
00166     };
00167 
00168     bool ExecutionEngine::removeFunction( ExecutableInterface* f )
00169     {
00170         // Remove from the queue.
00171         if ( !f )
00172             return false;
00173 
00174         if ( !f->isLoaded() )
00175             return true;
00176 
00177         // When not running, just remove.
00178         if ( getActivity() == 0 || !this->getActivity()->isActive() ) {
00179             if ( removeSelfFunction( f ) == false )
00180                 return false;
00181         } else {
00182             // Running: create message on stack.
00183             RemoveMsg rmsg(f,this);
00184             if ( this->process(&rmsg) )
00185 #ifdef USE_CPP11
00186                 this->waitForMessages( ! bind(&ExecutableInterface::isLoaded, f) || bind(&RemoveMsg::found,boost::ref(rmsg)) );
00187 #else
00188                 this->waitForMessages( ! lambda::bind(&ExecutableInterface::isLoaded, f) || lambda::bind(&RemoveMsg::found,boost::ref(rmsg)) );
00189 #endif
00190             if (!rmsg.found)
00191                 return false;
00192         }
00193         // unloading was succesful, now notify unloading:
00194         f->unloaded();
00195         return true;
00196     }
00197 
00198     bool ExecutionEngine::removeSelfFunction(ExecutableInterface* f  )
00199     {
00200         // since this function is executed in process messages, it is always safe to execute.
00201         if ( !f )
00202             return false;
00203         int nbr = f_queue->size();
00204         while (nbr != 0) {
00205             ExecutableInterface* foo = 0;
00206             if ( !f_queue->dequeue(foo) )
00207                 return false;
00208             if ( f  == foo) {
00209                 return true;
00210             }
00211             f_queue->enqueue(foo);
00212             --nbr;
00213         }
00214         return true;
00215     }
00216 
00217     bool ExecutionEngine::initialize() {
00218         // nop
00219         return true;
00220     }
00221 
00222     bool ExecutionEngine::hasWork()
00223     {
00224         return !mqueue->isEmpty();
00225     }
00226 
00227     void ExecutionEngine::processMessages()
00228     {
00229         // execute all commands from the AtomicQueue.
00230         // msg_lock may not be held when entering this function !
00231         DisposableInterface* com(0);
00232         {
00233             while ( mqueue->dequeue(com) ) {
00234                 assert( com );
00235                 com->executeAndDispose();
00236             }
00237             // there's no need to hold the lock during
00238             // emptying the queue. But we must hold the
00239             // lock once between excuteAndDispose and the
00240             // broadcast to avoid the race condition in
00241             // waitForMessages().
00242             // This allows us to recurse into processMessages.
00243             MutexLock locker( msg_lock );
00244         }
00245         if ( com )
00246             msg_cond.broadcast(); // required for waitForMessages() (3rd party thread)
00247     }
00248 
00249     bool ExecutionEngine::process( DisposableInterface* c )
00250     {
00251         // forward message to master ExecutionEngine if available
00252         if (mmaster) {
00253             return mmaster->process(c);
00254         }
00255 
00256         if ( c && this->getActivity() ) {
00257             // We only reject running functions when we're in the FatalError state.
00258             if (taskc && taskc->mTaskState == TaskCore::FatalError )
00259                 return false;
00260 
00261             bool result = mqueue->enqueue( c );
00262             this->getActivity()->trigger();
00263             msg_cond.broadcast(); // required for waitAndProcessMessages() (EE thread)
00264             return result;
00265         }
00266         return false;
00267     }
00268 
00269     void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred)
00270     {
00271         // forward the call to the master ExecutionEngine which is processing messages for us...
00272         if (mmaster) {
00273             mmaster->waitForMessages(pred);
00274             return;
00275         }
00276 
00277         if (this->getActivity()->thread()->isSelf())
00278             waitAndProcessMessages(pred);
00279         else
00280             waitForMessagesInternal(pred);
00281     }
00282 
00283 
00284     void ExecutionEngine::waitForFunctions(const boost::function<bool(void)>& pred)
00285     {
00286         if (this->getActivity()->thread()->isSelf())
00287             waitAndProcessFunctions(pred);
00288         else
00289             waitForMessagesInternal(pred); // same as for messages.
00290     }
00291 
00292     void ExecutionEngine::setMaster(ExecutionEngine *master)
00293     {
00294         mmaster = master;
00295     }
00296 
00297     void ExecutionEngine::setActivity( base::ActivityInterface* task )
00298     {
00299         extras::SlaveActivity *slave_activity = dynamic_cast<extras::SlaveActivity *>(task);
00300         if (slave_activity && slave_activity->getMaster()) {
00301             ExecutionEngine *master = dynamic_cast<ExecutionEngine *>(slave_activity->getMaster()->getRunner());
00302             setMaster(master);
00303         } else {
00304             setMaster(0);
00305         }
00306         RTT::base::RunnableInterface::setActivity(task);
00307     }
00308 
00309     void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred)
00310     {
00311         if ( pred() )
00312             return;
00313         // only to be called from the thread not executing step().
00314         os::MutexLock lock(msg_lock);
00315         while (!pred()) { // the mutex guards that processMessages can not run between !pred and the wait().
00316             msg_cond.wait(msg_lock); // now processMessages may run.
00317         }
00318     }
00319 
00320 
00321     void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred)
00322     {
00323         while ( !pred() ){
00324             // may not be called while holding the msg_lock !!!
00325             this->processMessages();
00326             {
00327                 // only to be called from the thread executing step().
00328                 // We must lock because the cond variable will unlock msg_lock.
00329                 os::MutexLock lock(msg_lock);
00330                 if (!pred()) {
00331                     msg_cond.wait(msg_lock); // now processMessages may run.
00332                 } else {
00333                     return; // do not process messages when pred() == true;
00334                 }
00335             }
00336         }
00337     }
00338 
00339     void ExecutionEngine::waitAndProcessFunctions(boost::function<bool(void)> const& pred)
00340     {
00341         while ( !pred() ){
00342             // may not be called while holding the msg_lock !!!
00343             this->processFunctions();
00344             {
00345                 // only to be called from the thread executing step().
00346                 // We must lock because the cond variable will unlock msg_lock.
00347                 os::MutexLock lock(msg_lock);
00348                 if (!pred()) {
00349                     msg_cond.wait(msg_lock); // now processMessages may run.
00350                 } else {
00351                     return; // do not process messages when pred() == true;
00352                 }
00353             }
00354         }
00355     }
00356 
00357     void ExecutionEngine::step() {
00358         processMessages();
00359         processFunctions();
00360         processChildren(); // aren't these ExecutableInterfaces ie functions ?
00361     }
00362 
00363     void ExecutionEngine::processChildren() {
00364         // only call updateHook in the Running state.
00365         if ( taskc ) {
00366             // A trigger() in startHook() will be ignored, we trigger in TaskCore after startHook finishes.
00367             if ( taskc->mTaskState == TaskCore::Running && taskc->mTargetState == TaskCore::Running ) {
00368                 TRY (
00369                     taskc->prepareUpdateHook();
00370                     taskc->updateHook();
00371                 ) CATCH(std::exception const& e,
00372                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00373                     log(Error) << "  " << e.what() << endlog();
00374                     taskc->exception();
00375                ) CATCH_ALL (
00376                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00377                     taskc->exception(); // calls stopHook,cleanupHook
00378                 )
00379             }
00380             // in case start() or updateHook() called error(), this will be called:
00381             if (  taskc->mTaskState == TaskCore::RunTimeError ) {
00382                 TRY (
00383                     taskc->errorHook();
00384                 ) CATCH(std::exception const& e,
00385                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00386                     log(Error) << "  " << e.what() << endlog();
00387                     taskc->exception();
00388                ) CATCH_ALL (
00389                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00390                     taskc->exception(); // calls stopHook,cleanupHook
00391                 )
00392             }
00393         }
00394         if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00395 
00396         // call all children as well.
00397         for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00398             if ( (*it)->mTaskState == TaskCore::Running  && (*it)->mTargetState == TaskCore::Running  ){
00399                 TRY (
00400                     (*it)->prepareUpdateHook();
00401                     (*it)->updateHook();
00402                 ) CATCH(std::exception const& e,
00403                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00404                     log(Error) << "  " << e.what() << endlog();
00405                     (*it)->exception();
00406                ) CATCH_ALL (
00407                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00408                     (*it)->exception(); // calls stopHook,cleanupHook
00409                 )
00410             }
00411             if (  (*it)->mTaskState == TaskCore::RunTimeError ){
00412                 TRY (
00413                     (*it)->errorHook();
00414                 ) CATCH(std::exception const& e,
00415                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00416                     log(Error) << "  " << e.what() << endlog();
00417                     (*it)->exception();
00418                ) CATCH_ALL (
00419                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00420                     (*it)->exception(); // calls stopHook,cleanupHook
00421                 )
00422             }
00423             if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00424         }
00425     }
00426 
00427     bool ExecutionEngine::breakLoop() {
00428         bool ok = true;
00429         if (taskc)
00430             ok = taskc->breakUpdateHook();
00431         for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00432             ok = (*it)->breakUpdateHook() && ok;
00433             }
00434         return ok;
00435     }
00436 
00437     bool ExecutionEngine::stopTask(TaskCore* task) {
00438         // stop and start where former will call breakLoop() in case of non-periodic.
00439         // this is a forced synchronization point, since stop() will only return when
00440         // step() returned.
00441         if ( getActivity() && this->getActivity()->stop() ) {
00442             this->getActivity()->start();
00443             return true;
00444         }
00445         return false;
00446     }
00447 
00448     void ExecutionEngine::setExceptionTask() {
00449         std::string name;
00450         TaskContext* tc = dynamic_cast<TaskContext*>(taskc);
00451         if (tc)
00452             name = tc->getName();
00453         else if (taskc)
00454             name = "TaskCore";
00455         else
00456             name = "GlobalEngine";
00457         log(Error) << "in "<<name<<": unhandled exception in sent operation." << endlog();
00458         if(taskc)
00459             taskc->exception();
00460     }
00461             
00462 
00463     void ExecutionEngine::finalize() {
00464         // nop
00465     }
00466 
00467 }
00468 


rtt
Author(s): RTT Developers
autogenerated on Sat Jun 8 2019 18:46:09