ExecutionEngine.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Wed Jan 18 14:11:40 CET 2006 ExecutionEngine.cxx
3 
4  ExecutionEngine.cxx - description
5  -------------------
6  begin : Wed January 18 2006
7  copyright : (C) 2006 Peter Soetens
8  email : peter.soetens@mech.kuleuven.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 
40 #include "Logger.hpp"
41 #include "ExecutionEngine.hpp"
42 #include "base/TaskCore.hpp"
43 #include "rtt-fwd.hpp"
44 #include "os/MutexLock.hpp"
45 #include "internal/MWSRQueue.hpp"
46 #include "TaskContext.hpp"
47 #include "internal/CatchConfig.hpp"
48 #include "extras/SlaveActivity.hpp"
49 #include "os/traces.h"
50 
51 #include <boost/bind.hpp>
52 #include <algorithm>
53 
54 #define ORONUM_EE_MQUEUE_SIZE 100
55 
56 namespace RTT
57 {
65  using namespace std;
66  using namespace detail;
67  using namespace boost;
68 
70  : taskc(owner),
72  port_queue(new MWSRQueue<PortInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
74  {
75  }
76 
78  {
79  Logger::In in("~ExecutionEngine");
80 
82  while ( f_queue->dequeue( foo ) )
83  foo->unloaded();
84 
86  while ( mqueue->dequeue( dis ) )
87  dis->dispose();
88 
89  delete f_queue;
90  delete port_queue;
91  delete mqueue;
92  }
93 
95  return taskc;
96  }
97 
99  {
100  // Execute all loaded Functions :
102  int nbr = f_queue->size(); // nbr to process.
103  // 1. Fetch new ones from queue.
104  while ( f_queue->dequeue(foo) ) {
105  assert(foo);
106  if ( foo->execute() == false ){
107  foo->unloaded();
108  msg_cond.broadcast(); // required for waitForFunctions() (3rd party thread)
109  } else {
110  f_queue->enqueue( foo );
111  }
112  if ( --nbr == 0) // we did a round-trip
113  break;
114  }
115  }
116 
118  {
119  if ( f && this->getActivity() ) {
120  // We only reject running functions when we're in the FatalError state.
122  return false;
123  f->loaded(this);
124  bool result = f_queue->enqueue( f );
125  return result;
126  }
127  return false;
128  }
129 
130  struct RemoveMsg : public DisposableInterface {
133  bool found;
135  : mf(f),mee(ee), found(false) {}
136  virtual void executeAndDispose() {
137  mee->removeSelfFunction( mf );
138  found = true; // always true in order to be able to quit waitForMessages.
139  }
140  virtual void dispose() {}
141  virtual bool isError() const { return false; }
142  bool done() const { return !mf->isLoaded() || found; }
143  };
144 
146  {
147  // Remove from the queue.
148  if ( !f )
149  return false;
150 
151  if ( !f->isLoaded() )
152  return true;
153 
154  // When not running, just remove.
155  if ( !this->getActivity()->isActive() ) {
156  if ( removeSelfFunction( f ) == false )
157  return false;
158  } else {
159  // Running: create message on stack.
160  RemoveMsg rmsg(f,this);
161  if ( this->process(&rmsg) )
162  this->waitForMessages( boost::bind(&RemoveMsg::done, &rmsg) );
163  if (!rmsg.found)
164  return false;
165  }
166  // unloading was succesful, now notify unloading:
167  f->unloaded();
168  return true;
169  }
170 
172  {
173  // since this function is executed in process messages, it is always safe to execute.
174  if ( !f )
175  return false;
176  int nbr = f_queue->size();
177  while (nbr != 0) {
179  if ( !f_queue->dequeue(foo) )
180  return false;
181  if ( f == foo) {
182  return true;
183  }
184  f_queue->enqueue(foo);
185  --nbr;
186  }
187  return true;
188  }
189 
191  return true;
192  }
193 
195  {
196  return !mqueue->isEmpty();
197  }
198 
200  {
201  // Fast bail-out :
202  if ( mqueue->isEmpty() )
203  return;
204  // execute all commands from the AtomicQueue.
205  // msg_lock may not be held when entering this function !
206  DisposableInterface* com(0);
207  {
208  while ( mqueue->dequeue(com) ) {
209  assert( com );
210  com->executeAndDispose();
211  }
212  // there's no need to hold the lock during
213  // emptying the queue. But we must hold the
214  // lock once between excuteAndDispose and the
215  // broadcast to avoid the race condition in
216  // waitForMessages().
217  // This allows us to recurse into processMessages.
218  MutexLock locker( msg_lock );
219  }
220  if ( com )
221  msg_cond.broadcast(); // required for waitForMessages() (3rd party thread)
222  }
223 
225  {
226  // Fast bail-out :
227  if (port_queue->isEmpty())
228  return;
229 
230  TaskContext* tc = dynamic_cast<TaskContext*>(taskc);
231  if (tc) {
232  PortInterface* port(0);
233  {
234  while ( port_queue->dequeue(port) ) {
235  assert( port );
236  tc->dataOnPortCallback(port);
237  }
238  }
239  }
240  }
241 
243  {
244  // We only reject running functions when we're in the FatalError state.
246  return false;
247 
248  if ( c && this->getActivity() ) {
249  bool result = mqueue->enqueue( c );
250  this->getActivity()->trigger();
251  msg_cond.broadcast(); // required for waitAndProcessMessages() (EE thread)
252  return result;
253  }
254  return false;
255  }
256 
258  {
259  // We only reject running port callbacks when we're in the FatalError state.
261  return false;
262 
263  if ( port && this->getActivity() ) {
264  bool result = port_queue->enqueue( port );
265  this->getActivity()->trigger();
266  return result;
267  }
268  return false;
269  }
270 
271  void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred)
272  {
273  if (isSelf())
275  else
277  }
278 
279  bool ExecutionEngine::isSelf() const {
280  os::ThreadInterface *thread = this->getThread();
281  return (thread && thread->isSelf());
282  }
283 
284  void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred)
285  {
286  if ( pred() )
287  return;
288  // only to be called from the thread not executing step().
289  os::MutexLock lock(msg_lock);
290  while (!pred()) { // the mutex guards that processMessages can not run between !pred and the wait().
291  msg_cond.wait(msg_lock); // now processMessages may run.
292  }
293  }
294 
295 
296  void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred)
297  {
298  // optimization for the case the predicate is already true
299  if ( pred() )
300  return;
301 
302  while ( true ) {
303  // may not be called while holding the msg_lock !!!
304  this->processMessages();
305  {
306  // only to be called from the thread executing step().
307  // We must lock because the cond variable will unlock msg_lock.
308  os::MutexLock lock(msg_lock);
309  if (!pred()) {
310  msg_cond.wait(msg_lock); // now processMessages may run.
311  } else {
312  return; // do not process messages when pred() == true;
313  }
314  }
315  }
316  }
317 
319  // we use work() now
320  }
321 
323  // Interprete work before calling into user code such that we are consistent at all times.
324  if (taskc) {
325  ++taskc->mCycleCounter;
326  switch(reason) {
329  break;
332  break;
334  ++taskc->mIOCounter;
335  break;
336  default:
337  break;
338  }
339  }
340  if (reason == RunnableInterface::Trigger) {
341  /* Callback step */
342  processMessages();
344  } else if (reason == RunnableInterface::TimeOut || reason == RunnableInterface::IOReady) {
345  /* Update step */
346  processMessages();
349  processHooks();
350  }
351  }
353  // only call updateHook in the Running state.
354  if ( taskc ) {
355  // A trigger() in startHook() will be ignored, we trigger in TaskCore after startHook finishes.
357  TRY (
358  { tracepoint_context(orocos_rtt, TaskContext_updateHook, taskc->mName.c_str());
359  taskc->updateHook(); }
360  ) CATCH(std::exception const& e,
361  log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
362  log(Error) << " " << e.what() << endlog();
363  taskc->exception();
364  ) CATCH_ALL (
365  log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
366  taskc->exception(); // calls stopHook,cleanupHook
367  )
368  }
369  // in case start() or updateHook() called error(), this will be called:
371  TRY (
372  { tracepoint_context(orocos_rtt, TaskContext_errorHook, taskc->mName.c_str());
373  taskc->errorHook(); }
374  ) CATCH(std::exception const& e,
375  log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
376  log(Error) << " " << e.what() << endlog();
377  taskc->exception();
378  ) CATCH_ALL (
379  log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
380  taskc->exception(); // calls stopHook,cleanupHook
381  )
382  }
383  }
384  }
385 
387  bool ok = true;
388  if (taskc)
389  ok = taskc->breakUpdateHook();
390  return ok;
391  }
392 
394  // stop and start where former will call breakLoop() in case of non-periodic.
395  // this is a forced synchronization point, since stop() will only return when
396  // step() returned.
397  if ( this->getActivity() && this->getActivity()->stop() ) {
398  this->getActivity()->start();
399  return true;
400  }
401  return false;
402  }
403 
405  std::string name;
406  TaskContext* tc = dynamic_cast<TaskContext*>(taskc);
407  if (tc)
408  name = tc->getName();
409  else if (taskc)
410  name = "TaskCore";
411  else
412  name = "GlobalEngine";
413  log(Error) << "in "<<name<<": unhandled exception in sent operation." << endlog();
414  if(taskc)
415  taskc->exception();
416  }
417 
418 
420  // nop
421  }
422 
423 }
424 
virtual os::ThreadInterface * getThread() const
#define TRY(C)
Contains static global configuration variables and cached entries.
Definition: CatchConfig.hpp:56
void loaded(ExecutionEngine *ee)
ActivityInterface * getActivity() const
Query for the task this interface is run in.
virtual bool removeFunction(base::ExecutableInterface *f)
virtual void dispose()
virtual void updateHook()
Definition: TaskCore.cpp:323
Definition: mystd.hpp:163
void waitForMessages(const boost::function< bool(void)> &pred)
virtual bool isActive() const =0
RemoveMsg(ExecutableInterface *f, ExecutionEngine *ee)
TaskState mTaskState
Definition: TaskCore.hpp:446
internal::MWSRQueue< base::PortInterface * > * port_queue
The state indicating that a run-time error has occured [red] and needs attention. ...
Definition: TaskCore.hpp:106
virtual bool removeSelfFunction(base::ExecutableInterface *f)
internal::MWSRQueue< base::ExecutableInterface * > * f_queue
unsigned int mCycleCounter
Definition: TaskCore.hpp:476
void waitForMessagesInternal(boost::function< bool(void)> const &pred)
unsigned int mIOCounter
Definition: TaskCore.hpp:480
internal::MWSRQueue< base::DisposableInterface * > * mqueue
virtual void executeAndDispose()=0
unsigned int mTimeOutCounter
Definition: TaskCore.hpp:484
#define CATCH(T, C)
Definition: CatchConfig.hpp:57
virtual bool isError() const
virtual void errorHook()
Definition: TaskCore.cpp:320
bool stopTask(base::TaskCore *task)
virtual bool initialize()
The state indicating the component encountered a fatal error and is unable to execute.
Definition: TaskCore.hpp:102
virtual void exception()
Definition: TaskCore.cpp:163
std::string mName
Definition: TaskCore.hpp:493
#define CATCH_ALL(C)
Definition: CatchConfig.hpp:58
base::TaskCore * getParent()
virtual bool breakUpdateHook()
Definition: TaskCore.cpp:327
An object that is executable and is freed after execution.
void waitAndProcessMessages(boost::function< bool(void)> const &pred)
The state indicating the component is running [green].
Definition: TaskCore.hpp:105
virtual void work(RunnableInterface::WorkReason reason)
base::TaskCore * taskc
#define tracepoint_context(provider, event, name)
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
ExecutableInterface * mf
#define ORONUM_EE_MQUEUE_SIZE
virtual void dataOnPortCallback(base::PortInterface *port)
TaskState mTargetState
Definition: TaskCore.hpp:458
virtual bool process(base::DisposableInterface *c)
virtual bool runFunction(base::ExecutableInterface *f)
virtual void executeAndDispose()
static Logger & log()
Definition: Logger.hpp:350
ExecutionEngine(base::TaskCore *owner=0)
bool done() const
unsigned int mTriggerCounter
Definition: TaskCore.hpp:488
static Logger::LogFunction endlog()
Definition: Logger.hpp:362
virtual const std::string & getName() const
virtual bool breakLoop()
void foo(double d)
ExecutionEngine * mee
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
bool wait(Mutex &m)
Definition: Condition.hpp:90


rtt
Author(s): RTT Developers
autogenerated on Tue Jun 25 2019 19:33:24