ExecutionEngine.cpp
Go to the documentation of this file.
00001 /***************************************************************************
00002   tag: Peter Soetens  Wed Jan 18 14:11:40 CET 2006  ExecutionEngine.cxx
00003 
00004                         ExecutionEngine.cxx -  description
00005                            -------------------
00006     begin                : Wed January 18 2006
00007     copyright            : (C) 2006 Peter Soetens
00008     email                : peter.soetens@mech.kuleuven.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 
00040 #include "Logger.hpp"
00041 #include "ExecutionEngine.hpp"
00042 #include "base/TaskCore.hpp"
00043 #include "rtt-fwd.hpp"
00044 #include "os/MutexLock.hpp"
00045 #include "internal/MWSRQueue.hpp"
00046 #include "TaskContext.hpp"
00047 #include "internal/CatchConfig.hpp"
00048 
00049 #include <boost/bind.hpp>
00050 #include <boost/ref.hpp>
00051 #include <boost/lambda/lambda.hpp>
00052 #include <boost/lambda/bind.hpp>
00053 #include <functional>
00054 #include <algorithm>
00055 
00056 #define ORONUM_EE_MQUEUE_SIZE 100
00057 
00058 namespace RTT
00059 {
00067     using namespace std;
00068     using namespace detail;
00069     using namespace boost;
00070 
00071     ExecutionEngine::ExecutionEngine( TaskCore* owner )
00072         : taskc(owner),
00073           mqueue(new MWSRQueue<DisposableInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
00074           f_queue( new MWSRQueue<ExecutableInterface*>(ORONUM_EE_MQUEUE_SIZE) )
00075     {
00076     }
00077 
00078     ExecutionEngine::~ExecutionEngine()
00079     {
00080         Logger::In in("~ExecutionEngine");
00081 
00082         // make a copy to avoid call-back troubles:
00083         std::vector<TaskCore*> copy = children;
00084         for (std::vector<TaskCore*>::iterator it = copy.begin(); it != copy.end();++it){
00085             (*it)->setExecutionEngine( 0 );
00086         }
00087         assert( children.empty() );
00088 
00089         ExecutableInterface* foo;
00090         while ( f_queue->dequeue( foo ) )
00091             foo->unloaded();
00092 
00093         DisposableInterface* dis;
00094         while ( mqueue->dequeue( dis ) )
00095             dis->dispose();
00096 
00097         delete f_queue;
00098         delete mqueue;
00099     }
00100 
00101     TaskCore* ExecutionEngine::getParent() {
00102         return taskc;
00103     }
00104 
00105     void ExecutionEngine::addChild(TaskCore* tc) {
00106         children.push_back( tc );
00107     }
00108 
00109     void ExecutionEngine::removeChild(TaskCore* tc) {
00110         vector<TaskCore*>::iterator it = find (children.begin(), children.end(), tc );
00111         if ( it != children.end() )
00112             children.erase(it);
00113     }
00114 
00115     void ExecutionEngine::processFunctions()
00116     {
00117         // Execute all loaded Functions :
00118         ExecutableInterface* foo = 0;
00119         int nbr = f_queue->size(); // nbr to process.
00120         // 1. Fetch new ones from queue.
00121         while ( f_queue->dequeue(foo) ) {
00122             assert(foo);
00123             if ( foo->execute() == false ){
00124                 foo->unloaded();
00125                 msg_cond.broadcast(); // required for waitForFunctions() (3rd party thread)
00126             } else {
00127                 f_queue->enqueue( foo );
00128             }
00129             if ( --nbr == 0) // we did a round-trip
00130                 break;
00131         }
00132     }
00133 
00134     bool ExecutionEngine::runFunction( ExecutableInterface* f )
00135     {
00136         if (this->getActivity() && f) {
00137             // We only reject running functions when we're in the FatalError state.
00138             if (taskc && taskc->mTaskState == TaskCore::FatalError )
00139                 return false;
00140             f->loaded(this);
00141             bool result = f_queue->enqueue( f );
00142             // signal work is to be done:
00143             this->getActivity()->trigger();
00144             return result;
00145         }
00146         return false;
00147     }
00148 
00149     struct RemoveMsg : public DisposableInterface {
00150         ExecutableInterface* mf;
00151         ExecutionEngine* mee;
00152         bool found;
00153         RemoveMsg(ExecutableInterface* f, ExecutionEngine* ee)
00154         : mf(f),mee(ee), found(false) {}
00155         virtual void executeAndDispose() {
00156             mee->removeSelfFunction( mf );
00157             found = true; // always true in order to be able to quit waitForMessages.
00158         }
00159         virtual void dispose() {}
00160         virtual bool isError() const { return false;}
00161 
00162     };
00163 
00164     bool ExecutionEngine::removeFunction( ExecutableInterface* f )
00165     {
00166         // Remove from the queue.
00167         if ( !f )
00168             return false;
00169 
00170         if ( !f->isLoaded() )
00171             return true;
00172 
00173         // When not running, just remove.
00174         if ( getActivity() == 0 || !this->getActivity()->isActive() ) {
00175             if ( removeSelfFunction( f ) == false )
00176                 return false;
00177         } else {
00178             // Running: create message on stack.
00179             RemoveMsg rmsg(f,this);
00180             if ( this->process(&rmsg) )
00181                 this->waitForMessages( ! lambda::bind(&ExecutableInterface::isLoaded, f) || lambda::bind(&RemoveMsg::found,boost::ref(rmsg)) );
00182             if (!rmsg.found)
00183                 return false;
00184         }
00185         // unloading was succesful, now notify unloading:
00186         f->unloaded();
00187         return true;
00188     }
00189 
00190     bool ExecutionEngine::removeSelfFunction(ExecutableInterface* f  )
00191     {
00192         // since this function is executed in process messages, it is always safe to execute.
00193         if ( !f )
00194             return false;
00195         int nbr = f_queue->size();
00196         while (nbr != 0) {
00197             ExecutableInterface* foo = 0;
00198             if ( !f_queue->dequeue(foo) )
00199                 return false;
00200             if ( f  == foo) {
00201                 return true;
00202             }
00203             f_queue->enqueue(foo);
00204             --nbr;
00205         }
00206         return true;
00207     }
00208 
00209     bool ExecutionEngine::initialize() {
00210         // nop
00211         return true;
00212     }
00213 
00214     bool ExecutionEngine::hasWork()
00215     {
00216         return !mqueue->isEmpty();
00217     }
00218 
00219     void ExecutionEngine::processMessages()
00220     {
00221         // execute all commands from the AtomicQueue.
00222         // msg_lock may not be held when entering this function !
00223         DisposableInterface* com(0);
00224         {
00225             while ( mqueue->dequeue(com) ) {
00226                 assert( com );
00227                 com->executeAndDispose();
00228             }
00229             // there's no need to hold the lock during
00230             // emptying the queue. But we must hold the
00231             // lock once between excuteAndDispose and the
00232             // broadcast to avoid the race condition in
00233             // waitForMessages().
00234             // This allows us to recurse into processMessages.
00235             MutexLock locker( msg_lock );
00236         }
00237         if ( com )
00238             msg_cond.broadcast(); // required for waitForMessages() (3rd party thread)
00239     }
00240 
00241     bool ExecutionEngine::process( DisposableInterface* c )
00242     {
00243         if ( c && this->getActivity() ) {
00244             // We only reject running functions when we're in the FatalError state.
00245             if (taskc && taskc->mTaskState == TaskCore::FatalError )
00246                 return false;
00247             bool result = mqueue->enqueue( c );
00248             this->getActivity()->trigger();
00249             msg_cond.broadcast(); // required for waitAndProcessMessages() (EE thread)
00250             return result;
00251         }
00252         return false;
00253     }
00254 
00255     void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred)
00256     {
00257         if (this->getActivity()->thread()->isSelf())
00258             waitAndProcessMessages(pred);
00259         else
00260             waitForMessagesInternal(pred);
00261     }
00262 
00263 
00264     void ExecutionEngine::waitForFunctions(const boost::function<bool(void)>& pred)
00265     {
00266         if (this->getActivity()->thread()->isSelf())
00267             waitAndProcessFunctions(pred);
00268         else
00269             waitForMessagesInternal(pred); // same as for messages.
00270     }
00271 
00272 
00273     void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred)
00274     {
00275         if ( pred() )
00276             return;
00277         // only to be called from the thread not executing step().
00278         os::MutexLock lock(msg_lock);
00279         while (!pred()) { // the mutex guards that processMessages can not run between !pred and the wait().
00280             msg_cond.wait(msg_lock); // now processMessages may run.
00281         }
00282     }
00283 
00284 
00285     void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred)
00286     {
00287         while ( !pred() ){
00288             // may not be called while holding the msg_lock !!!
00289             this->processMessages();
00290             {
00291                 // only to be called from the thread executing step().
00292                 // We must lock because the cond variable will unlock msg_lock.
00293                 os::MutexLock lock(msg_lock);
00294                 if (!pred()) {
00295                     msg_cond.wait(msg_lock); // now processMessages may run.
00296                 } else {
00297                     return; // do not process messages when pred() == true;
00298                 }
00299             }
00300         }
00301     }
00302 
00303     void ExecutionEngine::waitAndProcessFunctions(boost::function<bool(void)> const& pred)
00304     {
00305         while ( !pred() ){
00306             // may not be called while holding the msg_lock !!!
00307             this->processFunctions();
00308             {
00309                 // only to be called from the thread executing step().
00310                 // We must lock because the cond variable will unlock msg_lock.
00311                 os::MutexLock lock(msg_lock);
00312                 if (!pred()) {
00313                     msg_cond.wait(msg_lock); // now processMessages may run.
00314                 } else {
00315                     return; // do not process messages when pred() == true;
00316                 }
00317             }
00318         }
00319     }
00320 
00321     void ExecutionEngine::step() {
00322         processMessages();
00323         processFunctions();
00324         processChildren(); // aren't these ExecutableInterfaces ie functions ?
00325     }
00326 
00327     void ExecutionEngine::processChildren() {
00328         // only call updateHook in the Running state.
00329         if ( taskc ) {
00330             // A trigger() in startHook() will be ignored, we trigger in TaskCore after startHook finishes.
00331             if ( taskc->mTaskState == TaskCore::Running && taskc->mTargetState == TaskCore::Running ) {
00332                 TRY (
00333                     taskc->prepareUpdateHook();
00334                     taskc->updateHook();
00335                 ) CATCH(std::exception const& e,
00336                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00337                     log(Error) << "  " << e.what() << endlog();
00338                     taskc->exception();
00339                ) CATCH_ALL (
00340                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00341                     taskc->exception(); // calls stopHook,cleanupHook
00342                 )
00343             }
00344             // in case start() or updateHook() called error(), this will be called:
00345             if (  taskc->mTaskState == TaskCore::RunTimeError ) {
00346                 TRY (
00347                     taskc->errorHook();
00348                 ) CATCH(std::exception const& e,
00349                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00350                     log(Error) << "  " << e.what() << endlog();
00351                     taskc->exception();
00352                ) CATCH_ALL (
00353                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00354                     taskc->exception(); // calls stopHook,cleanupHook
00355                 )
00356             }
00357         }
00358         if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00359 
00360         // call all children as well.
00361         for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00362             if ( (*it)->mTaskState == TaskCore::Running  && (*it)->mTargetState == TaskCore::Running  ){
00363                 TRY (
00364                     (*it)->prepareUpdateHook();
00365                     (*it)->updateHook();
00366                 ) CATCH(std::exception const& e,
00367                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00368                     log(Error) << "  " << e.what() << endlog();
00369                     (*it)->exception();
00370                ) CATCH_ALL (
00371                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00372                     (*it)->exception(); // calls stopHook,cleanupHook
00373                 )
00374             }
00375             if (  (*it)->mTaskState == TaskCore::RunTimeError ){
00376                 TRY (
00377                     (*it)->errorHook();
00378                 ) CATCH(std::exception const& e,
00379                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00380                     log(Error) << "  " << e.what() << endlog();
00381                     (*it)->exception();
00382                ) CATCH_ALL (
00383                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00384                     (*it)->exception(); // calls stopHook,cleanupHook
00385                 )
00386             }
00387             if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00388         }
00389     }
00390 
00391     bool ExecutionEngine::breakLoop() {
00392         bool ok = true;
00393         if (taskc)
00394             ok = taskc->breakUpdateHook();
00395         for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00396             ok = (*it)->breakUpdateHook() && ok;
00397             }
00398         return ok;
00399     }
00400 
00401     bool ExecutionEngine::stopTask(TaskCore* task) {
00402         // stop and start where former will call breakLoop() in case of non-periodic.
00403         // this is a forced synchronization point, since stop() will only return when
00404         // step() returned.
00405         if ( getActivity() && this->getActivity()->stop() ) {
00406             this->getActivity()->start();
00407             return true;
00408         }
00409         return false;
00410     }
00411 
00412     void ExecutionEngine::setExceptionTask() {
00413         std::string name;
00414         TaskContext* tc = dynamic_cast<TaskContext*>(taskc);
00415         if (tc)
00416             name = tc->getName();
00417         else if (taskc)
00418             name = "TaskCore";
00419         else
00420             name = "GlobalEngine";
00421         log(Error) << "in "<<name<<": unhandled exception in sent operation." << endlog();
00422         if(taskc)
00423             taskc->exception();
00424     }
00425             
00426 
00427     void ExecutionEngine::finalize() {
00428         // nop
00429     }
00430 
00431 }
00432 


rtt
Author(s): RTT Developers
autogenerated on Thu Jan 2 2014 11:35:21