ExecutionEngine.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Wed Jan 18 14:11:40 CET 2006 ExecutionEngine.cxx
3 
4  ExecutionEngine.cxx - description
5  -------------------
6  begin : Wed January 18 2006
7  copyright : (C) 2006 Peter Soetens
8  email : peter.soetens@mech.kuleuven.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 
40 #include "Logger.hpp"
41 #include "ExecutionEngine.hpp"
42 #include "base/TaskCore.hpp"
43 #include "rtt-fwd.hpp"
44 #include "os/MutexLock.hpp"
45 #include "internal/MWSRQueue.hpp"
46 #include "TaskContext.hpp"
47 #include "internal/CatchConfig.hpp"
48 #include "extras/SlaveActivity.hpp"
49 #include "os/traces.h"
50 
51 #include <boost/bind.hpp>
52 #include <algorithm>
53 
54 #define ORONUM_EE_MQUEUE_SIZE 100
55 
56 namespace RTT
57 {
65  using namespace std;
66  using namespace detail;
67  using namespace boost;
68 
70  : taskc(owner),
72  port_queue(new MWSRQueue<PortInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
74  {
75  }
76 
78  {
79  Logger::In in("~ExecutionEngine");
80 
82  while ( f_queue->dequeue( foo ) )
83  foo->unloaded();
84 
86  while ( mqueue->dequeue( dis ) )
87  dis->dispose();
88 
89  delete f_queue;
90  delete port_queue;
91  delete mqueue;
92  }
93 
95  return taskc;
96  }
97 
99  {
100  // Execute all loaded Functions :
102  int nbr = f_queue->size(); // nbr to process.
103  // 1. Fetch new ones from queue.
104  while ( f_queue->dequeue(foo) ) {
105  assert(foo);
106  if ( foo->execute() == false ){
107  foo->unloaded();
108  {
109  // There's no need to hold the lock while
110  // processing the queue. But we must hold the
111  // lock once between foo->execute() and the
112  // broadcast to avoid the race condition in
113  // waitForMessagesInternal().
114  MutexLock locker( msg_lock );
115  }
116  msg_cond.broadcast(); // required for waitForFunctions() (3rd party thread)
117  } else {
118  f_queue->enqueue( foo );
119  }
120  if ( --nbr == 0) // we did a round-trip
121  break;
122  }
123  }
124 
126  {
127  if ( f && this->getActivity() ) {
128  // We only reject running functions when we're in the FatalError state.
130  return false;
131  f->loaded(this);
132  bool result = f_queue->enqueue( f );
133  return result;
134  }
135  return false;
136  }
137 
138  struct RemoveMsg : public DisposableInterface {
141  bool found;
143  : mf(f),mee(ee), found(false) {}
144  virtual void executeAndDispose() {
145  mee->removeSelfFunction( mf );
146  found = true; // always true in order to be able to quit waitForMessages.
147  }
148  virtual void dispose() {}
149  virtual bool isError() const { return false; }
150  bool done() const { return !mf->isLoaded() || found; }
151  };
152 
154  {
155  // Remove from the queue.
156  if ( !f )
157  return false;
158 
159  if ( !f->isLoaded() )
160  return true;
161 
162  // When not running, just remove.
163  if ( !this->getActivity()->isActive() ) {
164  if ( removeSelfFunction( f ) == false )
165  return false;
166  } else {
167  // Running: create message on stack.
168  RemoveMsg rmsg(f,this);
169  if ( this->process(&rmsg) )
170  this->waitForMessages( boost::bind(&RemoveMsg::done, &rmsg) );
171  if (!rmsg.found)
172  return false;
173  }
174  // unloading was succesful, now notify unloading:
175  f->unloaded();
176  return true;
177  }
178 
180  {
181  // since this function is executed in process messages, it is always safe to execute.
182  if ( !f )
183  return false;
184  int nbr = f_queue->size();
185  while (nbr != 0) {
187  if ( !f_queue->dequeue(foo) )
188  return false;
189  if ( f == foo) {
190  return true;
191  }
192  f_queue->enqueue(foo);
193  --nbr;
194  }
195  return true;
196  }
197 
199  return true;
200  }
201 
203  {
204  return !mqueue->isEmpty();
205  }
206 
208  {
209  // Fast bail-out :
210  if ( mqueue->isEmpty() )
211  return;
212  // execute all commands from the AtomicQueue.
213  // msg_lock may not be held when entering this function !
214  DisposableInterface* com(0);
215  {
216  while ( mqueue->dequeue(com) ) {
217  assert( com );
218  com->executeAndDispose();
219  }
220  // there's no need to hold the lock during
221  // emptying the queue. But we must hold the
222  // lock once between excuteAndDispose and the
223  // broadcast to avoid the race condition in
224  // waitForMessages().
225  // This allows us to recurse into processMessages.
226  MutexLock locker( msg_lock );
227  }
228  if ( com )
229  msg_cond.broadcast(); // required for waitForMessages() (3rd party thread)
230  }
231 
233  {
234  // Fast bail-out :
235  if (port_queue->isEmpty())
236  return;
237 
238  TaskContext* tc = dynamic_cast<TaskContext*>(taskc);
239  if (tc) {
240  PortInterface* port(0);
241  {
242  while ( port_queue->dequeue(port) ) {
243  assert( port );
244  tc->dataOnPortCallback(port);
245  }
246  }
247  }
248  }
249 
251  {
252  // We only reject running functions when we're in the FatalError state.
254  return false;
255 
256  if ( c && this->getActivity() ) {
257  bool result = mqueue->enqueue( c );
258  this->getActivity()->trigger();
259  msg_cond.broadcast(); // required for waitAndProcessMessages() (EE thread)
260  return result;
261  }
262  return false;
263  }
264 
266  {
267  // We only reject running port callbacks when we're in the FatalError state.
269  return false;
270 
271  if ( port && this->getActivity() ) {
272  bool result = port_queue->enqueue( port );
273  this->getActivity()->trigger();
274  return result;
275  }
276  return false;
277  }
278 
279  void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred)
280  {
281  if (isSelf())
283  else
285  }
286 
287  bool ExecutionEngine::isSelf() const {
288  os::ThreadInterface *thread = this->getThread();
289  return (thread && thread->isSelf());
290  }
291 
292  void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred)
293  {
294  if ( pred() )
295  return;
296  // only to be called from the thread not executing step().
297  os::MutexLock lock(msg_lock);
298  while (!pred()) { // the mutex guards that processMessages can not run between !pred and the wait().
299  msg_cond.wait(msg_lock); // now processMessages may run.
300  }
301  }
302 
303 
304  void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred)
305  {
306  // optimization for the case the predicate is already true
307  if ( pred() )
308  return;
309 
310  while ( true ) {
311  // may not be called while holding the msg_lock !!!
312  this->processMessages();
313  {
314  // only to be called from the thread executing step().
315  // We must lock because the cond variable will unlock msg_lock.
316  os::MutexLock lock(msg_lock);
317  if (!pred()) {
318  msg_cond.wait(msg_lock); // now processMessages may run.
319  } else {
320  return; // do not process messages when pred() == true;
321  }
322  }
323  }
324  }
325 
327  // we use work() now
328  }
329 
331  // Interprete work before calling into user code such that we are consistent at all times.
332  if (taskc) {
333  ++taskc->mCycleCounter;
334  switch(reason) {
337  break;
340  break;
342  ++taskc->mIOCounter;
343  break;
344  default:
345  break;
346  }
347  }
348  if (reason == RunnableInterface::Trigger) {
349  /* Callback step */
350  processMessages();
352  } else if (reason == RunnableInterface::TimeOut || reason == RunnableInterface::IOReady) {
353  /* Update step */
354  processMessages();
357  processHooks();
358  }
359  }
361  // only call updateHook in the Running state.
362  if ( taskc ) {
363  // A trigger() in startHook() will be ignored, we trigger in TaskCore after startHook finishes.
365  TRY (
366  { tracepoint_context(orocos_rtt, TaskContext_updateHook, taskc->mName.c_str());
367  taskc->updateHook(); }
368  ) CATCH(std::exception const& e,
369  log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
370  log(Error) << " " << e.what() << endlog();
371  taskc->exception();
372  ) CATCH_ALL (
373  log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
374  taskc->exception(); // calls stopHook,cleanupHook
375  )
376  }
377  // in case start() or updateHook() called error(), this will be called:
379  TRY (
380  { tracepoint_context(orocos_rtt, TaskContext_errorHook, taskc->mName.c_str());
381  taskc->errorHook(); }
382  ) CATCH(std::exception const& e,
383  log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
384  log(Error) << " " << e.what() << endlog();
385  taskc->exception();
386  ) CATCH_ALL (
387  log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
388  taskc->exception(); // calls stopHook,cleanupHook
389  )
390  }
391  }
392  }
393 
395  bool ok = true;
396  if (taskc)
397  ok = taskc->breakUpdateHook();
398  return ok;
399  }
400 
402  // stop and start where former will call breakLoop() in case of non-periodic.
403  // this is a forced synchronization point, since stop() will only return when
404  // step() returned.
405  if ( this->getActivity() && this->getActivity()->stop() ) {
406  this->getActivity()->start();
407  return true;
408  }
409  return false;
410  }
411 
413  std::string name;
414  TaskContext* tc = dynamic_cast<TaskContext*>(taskc);
415  if (tc)
416  name = tc->getName();
417  else if (taskc)
418  name = "TaskCore";
419  else
420  name = "GlobalEngine";
421  log(Error) << "in "<<name<<": unhandled exception in sent operation." << endlog();
422  if(taskc)
423  taskc->exception();
424  }
425 
426 
428  // nop
429  }
430 
431 }
432 
virtual os::ThreadInterface * getThread() const
#define TRY(C)
Contains static global configuration variables and cached entries.
Definition: CatchConfig.hpp:56
void loaded(ExecutionEngine *ee)
ActivityInterface * getActivity() const
Query for the task this interface is run in.
virtual bool removeFunction(base::ExecutableInterface *f)
virtual void dispose()
virtual void updateHook()
Definition: TaskCore.cpp:323
Definition: mystd.hpp:163
void waitForMessages(const boost::function< bool(void)> &pred)
virtual bool isActive() const =0
RemoveMsg(ExecutableInterface *f, ExecutionEngine *ee)
TaskState mTaskState
Definition: TaskCore.hpp:446
internal::MWSRQueue< base::PortInterface * > * port_queue
The state indicating that a run-time error has occured [red] and needs attention. ...
Definition: TaskCore.hpp:106
virtual bool removeSelfFunction(base::ExecutableInterface *f)
internal::MWSRQueue< base::ExecutableInterface * > * f_queue
unsigned int mCycleCounter
Definition: TaskCore.hpp:476
void waitForMessagesInternal(boost::function< bool(void)> const &pred)
unsigned int mIOCounter
Definition: TaskCore.hpp:480
internal::MWSRQueue< base::DisposableInterface * > * mqueue
virtual void executeAndDispose()=0
unsigned int mTimeOutCounter
Definition: TaskCore.hpp:484
#define CATCH(T, C)
Definition: CatchConfig.hpp:57
virtual bool isError() const
virtual void errorHook()
Definition: TaskCore.cpp:320
bool stopTask(base::TaskCore *task)
virtual bool initialize()
The state indicating the component encountered a fatal error and is unable to execute.
Definition: TaskCore.hpp:102
virtual void exception()
Definition: TaskCore.cpp:163
std::string mName
Definition: TaskCore.hpp:493
#define CATCH_ALL(C)
Definition: CatchConfig.hpp:58
base::TaskCore * getParent()
virtual bool breakUpdateHook()
Definition: TaskCore.cpp:327
An object that is executable and is freed after execution.
void waitAndProcessMessages(boost::function< bool(void)> const &pred)
The state indicating the component is running [green].
Definition: TaskCore.hpp:105
virtual void work(RunnableInterface::WorkReason reason)
base::TaskCore * taskc
#define tracepoint_context(provider, event, name)
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
ExecutableInterface * mf
#define ORONUM_EE_MQUEUE_SIZE
virtual void dataOnPortCallback(base::PortInterface *port)
TaskState mTargetState
Definition: TaskCore.hpp:458
virtual bool process(base::DisposableInterface *c)
virtual bool runFunction(base::ExecutableInterface *f)
virtual void executeAndDispose()
static Logger & log()
Definition: Logger.hpp:350
ExecutionEngine(base::TaskCore *owner=0)
bool done() const
unsigned int mTriggerCounter
Definition: TaskCore.hpp:488
static Logger::LogFunction endlog()
Definition: Logger.hpp:362
virtual const std::string & getName() const
virtual bool breakLoop()
void foo(double d)
ExecutionEngine * mee
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
bool wait(Mutex &m)
Definition: Condition.hpp:90


rtt
Author(s): RTT Developers
autogenerated on Fri Oct 25 2019 03:59:33