TaskContext.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Tue Dec 21 22:43:08 CET 2004 TaskContext.cxx
3 
4  TaskContext.cxx - description
5  -------------------
6  begin : Tue December 21 2004
7  copyright : (C) 2004 Peter Soetens
8  email : peter.soetens@mech.kuleuven.ac.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 
40 #include "TaskContext.hpp"
41 #include "base/ActionInterface.hpp"
42 #include "plugin/PluginLoader.hpp"
43 
44 #include <string>
45 #include <algorithm>
46 #include <functional>
47 #include <boost/bind.hpp>
48 #include <boost/mem_fn.hpp>
49 
50 #include "internal/DataSource.hpp"
51 #include "internal/mystd.hpp"
52 #include "internal/MWSRQueue.hpp"
53 #include "OperationCaller.hpp"
54 
55 #include "rtt-config.h"
56 
57 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
59 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
60 #include "Activity.hpp"
61 #endif
62 
63 namespace RTT
64 {
65 
66  using namespace boost;
67  using namespace std;
68  using namespace detail;
69 
70  TaskContext::TaskContext(const std::string& name, TaskState initial_state /*= Stopped*/)
71  : TaskCore( initial_state, name )
72  ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) )
73 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
74  ,our_act( new SequentialActivity( this->engine() ) )
75 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
76  ,our_act( new Activity( this->engine(), name ) )
77 #endif
78  {
79  this->setup();
80  }
81 
83  {
84  tcservice->setOwner(this);
85  // from Service
86  provides()->doc("The interface of this TaskContext.");
87 
88  this->addOperation("configure", &TaskContext::configure, this, ClientThread).doc("Configure this TaskContext (= configureHook() ).");
89  this->addOperation("isConfigured", &TaskContext::isConfigured, this, ClientThread).doc("Is this TaskContext configured ?");
90  this->addOperation("start", &TaskContext::start, this, ClientThread).doc("Start this TaskContext (= startHook() + updateHook() ).");
91  this->addOperation("activate", &TaskContext::activate, this, ClientThread).doc("Activate the Execution Engine of this TaskContext.");
92  this->addOperation("stop", &TaskContext::stop, this, ClientThread).doc("Stop this TaskContext (= stopHook() ).");
93  this->addOperation("isRunning", &TaskContext::isRunning, this, ClientThread).doc("Is this TaskContext started ?");
94  this->addOperation("getPeriod", &TaskContext::getPeriod, this, ClientThread).doc("Get the configured execution period. -1.0: no thread associated, 0.0: non periodic, > 0.0: the period.");
95  this->addOperation("setPeriod", &TaskContext::setPeriod, this, ClientThread).doc("Set the execution period in seconds.").arg("s", "Period in seconds.");
96  this->addOperation("getCpuAffinity", &TaskContext::getCpuAffinity, this, ClientThread).doc("Get the configured cpu affinity.");
97  this->addOperation("setCpuAffinity", &TaskContext::setCpuAffinity, this, ClientThread).doc("Set the cpu affinity.").arg("cpu", "Cpu mask.");
98  this->addOperation("isActive", &TaskContext::isActive, this, ClientThread).doc("Is the Execution Engine of this TaskContext active ?");
99  this->addOperation("inFatalError", &TaskContext::inFatalError, this, ClientThread).doc("Check if this TaskContext is in the FatalError state.");
100  this->addOperation("error", &TaskContext::error, this, ClientThread).doc("Enter the RunTimeError state (= errorHook() ).");
101  this->addOperation("inRunTimeError", &TaskContext::inRunTimeError, this, ClientThread).doc("Check if this TaskContext is in the RunTimeError state.");
102  this->addOperation("inException", &TaskContext::inException, this, ClientThread).doc("Check if this TaskContext is in the Exception state.");
103  this->addOperation("cleanup", &TaskContext::cleanup, this, ClientThread).doc("Reset this TaskContext to the PreOperational state ( =cleanupHook() ).");
104  this->addOperation("update", &TaskContext::update, this, ClientThread).doc("Execute (call) the update method directly.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
105 
106  this->addOperation("trigger", &TaskContext::trigger, this, ClientThread).doc("Trigger the update method for execution in the thread of this task.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
107  this->addOperation("loadService", &TaskContext::loadService, this, ClientThread).doc("Loads a service known to RTT into this component.").arg("service_name","The name with which the service is registered by in the PluginLoader.");
108 
109  this->addAttribute("TriggerOnStart",mTriggerOnStart);
110  this->addAttribute("CycleCounter",mCycleCounter);
111  this->addAttribute("IOCounter",mIOCounter);
112  this->addAttribute("TimeOutCounter",mTimeOutCounter);
113  this->addAttribute("TriggerCounter",mTriggerCounter);
114  // activity runs from the start.
115  if (our_act)
116  our_act->start();
117  }
118 
120  {
121  if (our_act)
122  our_act->stop();
123  // We don't call stop() or cleanup() here since this is
124  // the responsibility of the subclass. Calling these functions
125  // here would only lead to calling invalid virtual functions.
126  // [Rule no 1: Don't call virtual functions in a destructor.]
127  // [Rule no 2: Don't call virtual functions in a constructor.]
128  this->clear();
129 
130  // these need to be freed before we cleanup the EE:
131  localservs.clear();
132  tcservice->setOwner(0);
133  tcservice.reset();
134  tcrequests->setOwner(0);
135  tcrequests.reset();
136 
137  // remove from all users.
138  while( !musers.empty() ) {
139  musers.front()->removePeer(this);
140  }
141  // since we are destroyed, be sure that the peer no longer
142  // has a 'user' pointer to us.
143  while ( !_task_map.empty() ) {
144  _task_map.begin()->second->removeUser(this);
145  _task_map.erase( _task_map.begin() );
146  }
147  // Do not call this->disconnect() !!!
148  // Ports are probably already destructed by user code.
149  }
150 
152  {
153  bool failure = false;
154  const std::string& location = this->getName();
155  Logger::In in( location.c_str() );
156 
157  DataFlowInterface::Ports myports = this->ports()->getPorts();
158  for (DataFlowInterface::Ports::iterator it = myports.begin();
159  it != myports.end();
160  ++it) {
161 
162  // Then try to get the peer port's connection
163  PortInterface* peerport = peer->ports()->getPort( (*it)->getName() );
164  if ( !peerport ) {
165  log(Debug)<< "Peer Task "<<peer->getName() <<" has no Port " << (*it)->getName() << endlog();
166  continue;
167  }
168 
169  // Skip if they have the same type
170  if((dynamic_cast<OutputPortInterface*>(*it) && dynamic_cast<OutputPortInterface*>(peerport)) ||
171  (dynamic_cast<InputPortInterface*>(*it) && dynamic_cast<InputPortInterface*>(peerport)))
172  {
173  log(Debug)<< (*it)->getName() << " and " << peerport->getName() << " have the same type" << endlog();
174  continue;
175  }
176 
177  // Try to find a way to connect them
178  if ( !(*it)->connectTo( peerport ) ) {
179  log(Debug)<< "Data flow incompatible between ports "
180  << getName() << "." << (*it)->getName() << " and "
181  << peer->getName() << "." << (*it)->getName() << endlog();
182  failure = true;
183  }
184  }
185  return !failure;
186  }
187 
189  {
190  bool success = true;
191  const std::string& location = this->getName();
192  Logger::In in( location.c_str() );
193 
194  vector<string> myreqs = this->requires()->getRequesterNames();
195  vector<string> peerreqs = peer->requires()->getRequesterNames();
196 
197  this->requires()->connectTo( peer->provides() );
198  for (vector<string>::iterator it = myreqs.begin();
199  it != myreqs.end();
200  ++it) {
201  ServiceRequester::shared_ptr sr = this->requires(*it);
202  if ( !sr->ready() ) {
203  if (peer->provides()->hasService( *it ))
204  success = sr->connectTo( peer->provides(*it) ) && success;
205  else {
206  log(Debug)<< "Peer Task "<<peer->getName() <<" provides no Service " << *it << endlog();
207  }
208  }
209  }
210 
211  peer->requires()->connectTo( this->provides() );
212  for (vector<string>::iterator it = peerreqs.begin();
213  it != peerreqs.end();
214  ++it) {
215  ServiceRequester::shared_ptr sr = peer->requires(*it);
216  if ( !sr->ready() ) {
217  if (this->provides()->hasService(*it))
218  success = sr->connectTo( this->provides(*it) ) && success;
219  else
220  log(Debug)<< "This Task provides no Service " << *it << " for peer Task "<<peer->getName() <<"."<< endlog();
221  }
222  }
223  return success;
224  }
225 
226  bool TaskContext::prepareProvide(const std::string& name) {
227  return tcservice->hasService(name) || plugin::PluginLoader::Instance()->loadService(name, this);
228  }
229 
230  bool TaskContext::loadService(const std::string& service_name) {
231  if ( provides()->hasService(service_name))
232  return true;
233  return PluginLoader::Instance()->loadService(service_name, this);
234  }
235 
237  {
238  if (peer)
239  musers.push_back(peer);
240  }
241 
243  {
244  Users::iterator it = find(musers.begin(), musers.end(), peer);
245  if ( it != musers.end() )
246  musers.erase(it);
247  }
248 
249  bool TaskContext::addPeer( TaskContext* peer, std::string alias )
250  {
251  if ( alias.empty() )
252  alias = peer->getName();
253  if ( !peer || _task_map.count( alias ) != 0 )
254  return false;
255  _task_map[ alias ] = peer;
256  peer->addUser( this );
257  return true;
258  }
259 
260  void TaskContext::removePeer( const std::string& name )
261  {
262  PeerMap::iterator it = _task_map.find( name );
263  if ( _task_map.end() != it ) {
264  it->second->removeUser( this );
265  _task_map.erase( _task_map.find( name ) );
266  }
267  }
268 
270  {
271  for( PeerMap::iterator it = _task_map.begin(); it != _task_map.end(); ++it)
272  if ( it->second == peer ) {
273  peer->removeUser( this );
274  _task_map.erase( it );
275  return;
276  }
277  }
278 
280  {
281  if ( _task_map.count( peer->getName() ) != 0
282  || peer->hasPeer( this->getName() ) )
283  return false;
284  this->addPeer ( peer );
285  peer->addPeer ( this );
286  return true;
287  }
288 
290  Logger::In in( this->getName().c_str() );
291  // disconnect all our ports
292  DataFlowInterface::Ports myports = this->ports()->getPorts();
293  for (DataFlowInterface::Ports::iterator it = myports.begin();
294  it != myports.end();
295  ++it) {
296  (*it)->disconnect();
297  }
298 
299  // remove from all users.
300  while( !musers.empty() ) {
301  musers.front()->removePeer(this);
302  }
303 
304  while ( !_task_map.empty() ) {
305  _task_map.begin()->second->removeUser(this);
306  _task_map.erase( _task_map.begin() );
307  }
308  }
309 
310  void TaskContext::disconnectPeers( const std::string& name )
311  {
312  if ( _task_map.end() != _task_map.find( name ) ) {
313  TaskContext* peer = _task_map.find(name)->second;
314  this->removePeer(peer);
315  peer->removePeer(this);
316  }
317  }
318 
319  std::vector<std::string> TaskContext::getPeerList() const
320  {
321  std::vector<std::string> res;
322  std::transform(_task_map.begin(), _task_map.end(),
323  std::back_inserter( res ),
325  return res;
326  }
327 
328  bool TaskContext::hasPeer( const std::string& peer_name ) const
329  {
330  return _task_map.count( peer_name ) == 1;
331  }
332 
333  TaskContext* TaskContext::getPeer(const std::string& peer_name ) const
334  {
335  if (this->hasPeer( peer_name ) )
336  return _task_map.find(peer_name)->second;
337  return 0;
338  }
339 
341  {
342  if (this->isRunning())
343  return false;
344 
345  // refuse to setActivity from our own active thread
346  if (our_act) {
347  if (our_act->isActive() && our_act->thread() && our_act->thread()->isSelf()) {
348  log(Error) << "Cannot set the activity of TaskContext "
349  << this->getName() << " from its own thread." << endlog();
350  return false;
351  }
352  }
353 
354  if (!new_act) {
355 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
356  new_act = new SequentialActivity();
357 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
358  new_act = new Activity();
359 #endif
360  } else {
361  new_act->stop();
362  }
363  if (our_act) {
364  our_act->stop();
365  our_act.reset();
366  }
367  if (new_act) {
368  new_act->run( this->engine() );
370  our_act->start();
371  } else {
372  our_act.reset();
373  }
374  return true;
375  }
376 
378  {
379  if (!new_act)
380  return;
381  new_act->stop();
382  if(our_act){
383  our_act->stop();
384  }
385  our_act.reset( new_act );
386  our_act->run( this->engine() );
387  our_act->start();
388  }
389 
391  {
392  if (this->engine()->getActivity() != our_act.get() )
393  return this->engine()->getActivity();
394  return our_act.get();
395  }
396 
398  {
399  tcservice->clear();
400  tcrequests->clear();
401  }
402 
404  {
405  return true;
406  }
407 
409  return A->connectPorts(B);
410  }
411 
413  return A->connectPeers(B);
414  }
415 
417  {
418  if ( this->isRunning() )
419  return false;
420 #ifdef ORO_SIGNALLING_PORTS
421  ports()->setupHandles();
422 #endif
423  return TaskCore::start(); // calls startHook()
424  }
425 
427  {
428  if ( !this->isRunning() )
429  return false;
430  if (TaskCore::stop()) { // calls stopHook()
431 #ifdef ORO_SIGNALLING_PORTS
432  ports()->cleanupHandles();
433 #endif
434  return true;
435  }
436  return false;
437  }
438 
440  {
441  if ( this->dataOnPortHook(port) ) {
442  this->engine()->process(port);
443  }
444  }
445 
447  return this->isRunning();
448  }
449 
451  UserCallbacks::iterator it = user_callbacks.find(port);
452  if (it != user_callbacks.end() )
453  it->second(port); // fire the user callback
454  }
455 
457  // user_callbacks will only be emitted from updateHook().
458  MutexLock lock(mportlock);
459  user_callbacks[port] = callback;
460  }
461 
463  MutexLock lock(mportlock);
464  UserCallbacks::iterator it = user_callbacks.find(port);
465  if (it != user_callbacks.end() ) {
466  user_callbacks.erase(it);
467  }
468  }
469 }
TaskContext(const std::string &name, TaskState initial_state=Stopped)
Definition: TaskContext.cpp:70
virtual ~TaskContext()
PeerMap _task_map
map of the tasks we are using
UserCallbacks user_callbacks
Service::shared_ptr provides()
ActivityInterface * getActivity() const
Query for the task this interface is run in.
virtual bool trigger()
Definition: TaskCore.cpp:91
The default, thread-less activity for any newly created TaskContext.
virtual bool stop()
Users musers
map of the tasks that are using us.
bool loadService(const std::string &service_name)
void addUser(TaskContext *user)
Definition: mystd.hpp:163
void forceActivity(base::ActivityInterface *new_act)
boost::shared_ptr< ServiceRequester > shared_ptr
const std::string & getName() const
virtual bool connectPeers(TaskContext *peer)
virtual void removePeer(const std::string &name)
virtual bool isConfigured() const
Definition: TaskCore.cpp:270
virtual bool activate()
Definition: TaskCore.cpp:258
virtual bool setCpuAffinity(unsigned cpu)
Definition: TaskCore.cpp:306
virtual bool isRunning() const
Definition: TaskCore.cpp:266
virtual bool configure()
Definition: TaskCore.cpp:96
bool setActivity(base::ActivityInterface *new_act)
void dataOnPort(base::PortInterface *port)
void setDataOnPortCallback(base::InputPortInterface *port, SlotFunction callback)
unsigned int mCycleCounter
Definition: TaskCore.hpp:476
unsigned int mIOCounter
Definition: TaskCore.hpp:480
virtual TaskContext * getPeer(const std::string &peer_name) const
virtual void error()
Definition: TaskCore.cpp:156
virtual Seconds getPeriod() const
Definition: TaskCore.cpp:291
virtual bool run(RunnableInterface *r)
virtual bool inException() const
Definition: TaskCore.cpp:278
virtual bool hasPeer(const std::string &peer_name) const
ServiceRequester::shared_ptr requires()
unsigned int mTimeOutCounter
Definition: TaskCore.hpp:484
LocalServices localservs
boost::function< void(base::PortInterface *)> SlotFunction
virtual void disconnect()
os::Mutex mportlock
DataFlowInterface * ports()
virtual bool inFatalError() const
Definition: TaskCore.cpp:274
base::PortInterface * getPort(const std::string &name) const
virtual PeerList getPeerList() const
boost::shared_ptr< ActivityInterface > shared_ptr
virtual bool update()
Definition: TaskCore.cpp:86
bool prepareProvide(const std::string &name)
Interface to start/stop and query a Activity.
Service::shared_ptr tcservice
ServiceRequester::shared_ptr tcrequests
virtual void disconnectPeers(const std::string &name)
bool addAttribute(const std::string &name, T &attr)
std::vector< base::PortInterface * > Ports
void removeUser(TaskContext *user)
An Activity executes a RunnableInterface object in a (periodic) thread.
Definition: Activity.hpp:70
virtual bool isActive() const
Definition: TaskCore.cpp:286
virtual bool inRunTimeError() const
Definition: TaskCore.cpp:282
virtual unsigned getCpuAffinity() const
Definition: TaskCore.cpp:301
base::ActivityInterface::shared_ptr our_act
virtual bool connectPorts(TaskContext *peer)
static boost::shared_ptr< PluginLoader > Instance()
void removeDataOnPortCallback(base::PortInterface *port)
virtual bool connectServices(TaskContext *peer)
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
virtual bool stop()
Definition: TaskCore.cpp:232
virtual void dataOnPortCallback(base::PortInterface *port)
virtual bool process(base::DisposableInterface *c)
Operation< Signature > & addOperation(Operation< Signature > &op)
static Logger & log()
Definition: Logger.hpp:350
virtual bool setPeriod(Seconds s)
Definition: TaskCore.cpp:296
unsigned int mTriggerCounter
Definition: TaskCore.hpp:488
const ExecutionEngine * engine() const
Definition: TaskCore.hpp:306
static Logger::LogFunction endlog()
Definition: Logger.hpp:362
virtual bool start()
virtual bool addPeer(TaskContext *peer, std::string alias="")
base::ActivityInterface * getActivity()
virtual bool cleanup()
Definition: TaskCore.cpp:130
virtual const std::string & getName() const
virtual void clear()
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
virtual bool start()
Definition: TaskCore.cpp:198
virtual bool ready()
virtual bool dataOnPortHook(base::PortInterface *port)


rtt
Author(s): RTT Developers
autogenerated on Fri Oct 25 2019 03:59:44