00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "TaskContext.hpp"
00041 #include "base/ActionInterface.hpp"
00042 #include "plugin/PluginLoader.hpp"
00043
00044 #include <string>
00045 #include <algorithm>
00046 #include <functional>
00047 #include <boost/bind.hpp>
00048 #include <boost/mem_fn.hpp>
00049
00050 #include "internal/DataSource.hpp"
00051 #include "internal/mystd.hpp"
00052 #include "internal/MWSRQueue.hpp"
00053 #include "OperationCaller.hpp"
00054
00055 #include "rtt-config.h"
00056
00057 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00058 #include "extras/SequentialActivity.hpp"
00059 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
00060 #include "Activity.hpp"
00061 #endif
00062
00063 namespace RTT
00064 {
00065
00066 using namespace boost;
00067 using namespace std;
00068 using namespace detail;
00069
00070 TaskContext::TaskContext(const std::string& name, TaskState initial_state )
00071 : TaskCore( initial_state)
00072 ,portqueue( new MWSRQueue<PortInterface*>(64) )
00073 ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) )
00074 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00075 ,our_act( new SequentialActivity( this->engine() ) )
00076 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
00077 ,our_act( new Activity( this->engine(), name ) )
00078 #endif
00079 {
00080 this->setup();
00081 }
00082
00083 TaskContext::TaskContext(const std::string& name, ExecutionEngine* parent, TaskState initial_state )
00084 : TaskCore(parent, initial_state)
00085 ,portqueue( new MWSRQueue<PortInterface*>(64) )
00086 ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) )
00087 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00088 ,our_act( parent ? 0 : new SequentialActivity( this->engine() ) )
00089 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
00090 ,our_act( parent ? 0 : new Activity( this->engine(), name ) )
00091 #endif
00092 {
00093 this->setup();
00094 }
00095
00096 void TaskContext::setup()
00097 {
00098 tcservice->setOwner(this);
00099
00100 provides()->doc("The interface of this TaskContext.");
00101
00102 this->addOperation("configure", &TaskContext::configure, this, ClientThread).doc("Configure this TaskContext (= configureHook() ).");
00103 this->addOperation("isConfigured", &TaskContext::isConfigured, this, ClientThread).doc("Is this TaskContext configured ?");
00104 this->addOperation("start", &TaskContext::start, this, ClientThread).doc("Start this TaskContext (= startHook() + updateHook() ).");
00105 this->addOperation("activate", &TaskContext::activate, this, ClientThread).doc("Activate the Execution Engine of this TaskContext.");
00106 this->addOperation("stop", &TaskContext::stop, this, ClientThread).doc("Stop this TaskContext (= stopHook() ).");
00107 this->addOperation("isRunning", &TaskContext::isRunning, this, ClientThread).doc("Is this TaskContext started ?");
00108 this->addOperation("getPeriod", &TaskContext::getPeriod, this, ClientThread).doc("Get the configured execution period. -1.0: no thread associated, 0.0: non periodic, > 0.0: the period.");
00109 this->addOperation("setPeriod", &TaskContext::setPeriod, this, ClientThread).doc("Set the execution period in seconds.").arg("s", "Period in seconds.");
00110 this->addOperation("getCpuAffinity", &TaskContext::getCpuAffinity, this, ClientThread).doc("Get the configured cpu affinity.");
00111 this->addOperation("setCpuAffinity", &TaskContext::setCpuAffinity, this, ClientThread).doc("Set the cpu affinity.").arg("cpu", "Cpu mask.");
00112 this->addOperation("isActive", &TaskContext::isActive, this, ClientThread).doc("Is the Execution Engine of this TaskContext active ?");
00113 this->addOperation("inFatalError", &TaskContext::inFatalError, this, ClientThread).doc("Check if this TaskContext is in the FatalError state.");
00114 this->addOperation("error", &TaskContext::error, this, ClientThread).doc("Enter the RunTimeError state (= errorHook() ).");
00115 this->addOperation("inRunTimeError", &TaskContext::inRunTimeError, this, ClientThread).doc("Check if this TaskContext is in the RunTimeError state.");
00116 this->addOperation("cleanup", &TaskContext::cleanup, this, ClientThread).doc("Reset this TaskContext to the PreOperational state ( =cleanupHook() ).");
00117 this->addOperation("update", &TaskContext::update, this, ClientThread).doc("Execute (call) the update method directly.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
00118
00119 this->addOperation("trigger", &TaskContext::trigger, this, ClientThread).doc("Trigger the update method for execution in the thread of this task.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
00120 this->addOperation("loadService", &TaskContext::loadService, this, ClientThread).doc("Loads a service known to RTT into this component.").arg("service_name","The name with which the service is registered by in the PluginLoader.");
00121
00122 if (our_act)
00123 our_act->start();
00124 }
00125
00126 TaskContext::~TaskContext()
00127 {
00128 if (our_act)
00129 our_act->stop();
00130
00131
00132
00133
00134
00135 tcservice->clear();
00136
00137 tcrequests->clear();
00138
00139
00140 while( !musers.empty() ) {
00141 musers.front()->removePeer(this);
00142 }
00143
00144
00145 while ( !_task_map.empty() ) {
00146 _task_map.begin()->second->removeUser(this);
00147 _task_map.erase( _task_map.begin() );
00148 }
00149
00150
00151 delete portqueue;
00152 }
00153
00154 bool TaskContext::connectPorts( TaskContext* peer )
00155 {
00156 bool failure = false;
00157 const std::string& location = this->getName();
00158 Logger::In in( location.c_str() );
00159
00160 DataFlowInterface::Ports myports = this->ports()->getPorts();
00161 for (DataFlowInterface::Ports::iterator it = myports.begin();
00162 it != myports.end();
00163 ++it) {
00164
00165
00166 PortInterface* peerport = peer->ports()->getPort( (*it)->getName() );
00167 if ( !peerport ) {
00168 log(Debug)<< "Peer Task "<<peer->getName() <<" has no Port " << (*it)->getName() << endlog();
00169 continue;
00170 }
00171
00172
00173 if((dynamic_cast<OutputPortInterface*>(*it) && dynamic_cast<OutputPortInterface*>(peerport)) ||
00174 (dynamic_cast<InputPortInterface*>(*it) && dynamic_cast<InputPortInterface*>(peerport)))
00175 {
00176 log(Debug)<< (*it)->getName() << " and " << peerport->getName() << " have the same type" << endlog();
00177 continue;
00178 }
00179
00180
00181 if ( !(*it)->connectTo( peerport ) ) {
00182 log(Debug)<< "Data flow incompatible between ports "
00183 << getName() << "." << (*it)->getName() << " and "
00184 << peer->getName() << "." << (*it)->getName() << endlog();
00185 failure = true;
00186 }
00187 }
00188 return !failure;
00189 }
00190
00191 bool TaskContext::connectServices( TaskContext* peer )
00192 {
00193 bool success = true;
00194 const std::string& location = this->getName();
00195 Logger::In in( location.c_str() );
00196
00197 vector<string> myreqs = this->requires()->getRequesterNames();
00198 vector<string> peerreqs = peer->requires()->getRequesterNames();
00199
00200 this->requires()->connectTo( peer->provides() );
00201 for (vector<string>::iterator it = myreqs.begin();
00202 it != myreqs.end();
00203 ++it) {
00204 ServiceRequester::shared_ptr sr = this->requires(*it);
00205 if ( !sr->ready() ) {
00206 if (peer->provides()->hasService( *it ))
00207 success = sr->connectTo( peer->provides(*it) ) && success;
00208 else {
00209 log(Debug)<< "Peer Task "<<peer->getName() <<" provides no Service " << *it << endlog();
00210 }
00211 }
00212 }
00213
00214 peer->requires()->connectTo( this->provides() );
00215 for (vector<string>::iterator it = peerreqs.begin();
00216 it != peerreqs.end();
00217 ++it) {
00218 ServiceRequester::shared_ptr sr = peer->requires(*it);
00219 if ( !sr->ready() ) {
00220 if (this->provides()->hasService(*it))
00221 success = sr->connectTo( this->provides(*it) ) && success;
00222 else
00223 log(Debug)<< "This Task provides no Service " << *it << " for peer Task "<<peer->getName() <<"."<< endlog();
00224 }
00225 }
00226 return success;
00227 }
00228
00229 bool TaskContext::prepareProvide(const std::string& name) {
00230 return tcservice->hasService(name) || plugin::PluginLoader::Instance()->loadService(name, this);
00231 }
00232
00233 bool TaskContext::loadService(const std::string& service_name) {
00234 if ( provides()->hasService(service_name))
00235 return true;
00236 return PluginLoader::Instance()->loadService(service_name, this);
00237 }
00238
00239 void TaskContext::addUser( TaskContext* peer )
00240 {
00241 if (peer)
00242 musers.push_back(peer);
00243 }
00244
00245 void TaskContext::removeUser( TaskContext* peer )
00246 {
00247 Users::iterator it = find(musers.begin(), musers.end(), peer);
00248 if ( it != musers.end() )
00249 musers.erase(it);
00250 }
00251
00252 bool TaskContext::addPeer( TaskContext* peer, std::string alias )
00253 {
00254 if ( alias.empty() )
00255 alias = peer->getName();
00256 if ( !peer || _task_map.count( alias ) != 0 )
00257 return false;
00258 _task_map[ alias ] = peer;
00259 peer->addUser( this );
00260 return true;
00261 }
00262
00263 void TaskContext::removePeer( const std::string& name )
00264 {
00265 PeerMap::iterator it = _task_map.find( name );
00266 if ( _task_map.end() != it ) {
00267 it->second->removeUser( this );
00268 _task_map.erase( _task_map.find( name ) );
00269 }
00270 }
00271
00272 void TaskContext::removePeer( TaskContext* peer )
00273 {
00274 for( PeerMap::iterator it = _task_map.begin(); it != _task_map.end(); ++it)
00275 if ( it->second == peer ) {
00276 peer->removeUser( this );
00277 _task_map.erase( it );
00278 return;
00279 }
00280 }
00281
00282 bool TaskContext::connectPeers( TaskContext* peer )
00283 {
00284 if ( _task_map.count( peer->getName() ) != 0
00285 || peer->hasPeer( this->getName() ) )
00286 return false;
00287 this->addPeer ( peer );
00288 peer->addPeer ( this );
00289 return true;
00290 }
00291
00292 void TaskContext::disconnect() {
00293 Logger::In in( this->getName().c_str() );
00294
00295 DataFlowInterface::Ports myports = this->ports()->getPorts();
00296 for (DataFlowInterface::Ports::iterator it = myports.begin();
00297 it != myports.end();
00298 ++it) {
00299 (*it)->disconnect();
00300 }
00301
00302
00303 while( !musers.empty() ) {
00304 musers.front()->removePeer(this);
00305 }
00306
00307 while ( !_task_map.empty() ) {
00308 _task_map.begin()->second->removeUser(this);
00309 _task_map.erase( _task_map.begin() );
00310 }
00311 }
00312
00313 void TaskContext::disconnectPeers( const std::string& name )
00314 {
00315 if ( _task_map.end() != _task_map.find( name ) ) {
00316 TaskContext* peer = _task_map.find(name)->second;
00317 this->removePeer(peer);
00318 peer->removePeer(this);
00319 }
00320 }
00321
00322 std::vector<std::string> TaskContext::getPeerList() const
00323 {
00324 std::vector<std::string> res;
00325 std::transform(_task_map.begin(), _task_map.end(),
00326 std::back_inserter( res ),
00327 select1st<PeerMap::value_type>() );
00328 return res;
00329 }
00330
00331 bool TaskContext::hasPeer( const std::string& peer_name ) const
00332 {
00333 return _task_map.count( peer_name ) == 1;
00334 }
00335
00336 TaskContext* TaskContext::getPeer(const std::string& peer_name ) const
00337 {
00338 if (this->hasPeer( peer_name ) )
00339 return _task_map.find(peer_name)->second;
00340 return 0;
00341 }
00342
00343 bool TaskContext::setActivity(ActivityInterface* new_act)
00344 {
00345 if (this->isRunning())
00346 return false;
00347 if ( new_act == 0) {
00348 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00349 new_act = new SequentialActivity();
00350 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
00351 new_act = new Activity();
00352 #endif
00353 }
00354 new_act->stop();
00355 if(our_act){
00356 our_act->stop();
00357 }
00358 new_act->run( this->engine() );
00359 our_act = ActivityInterface::shared_ptr( new_act );
00360 our_act->start();
00361 return true;
00362 }
00363
00364 void TaskContext::forceActivity(ActivityInterface* new_act)
00365 {
00366 if (!new_act)
00367 return;
00368 new_act->stop();
00369 if(our_act){
00370 our_act->stop();
00371 }
00372 our_act.reset( new_act );
00373 our_act->run( this->engine() );
00374 our_act->start();
00375 }
00376
00377 ActivityInterface* TaskContext::getActivity()
00378 {
00379 if (this->engine()->getActivity() != our_act.get() )
00380 return this->engine()->getActivity();
00381 return our_act.get();
00382 }
00383
00384 void TaskContext::clear()
00385 {
00386 tcservice->clear();
00387 tcrequests->clear();
00388 }
00389
00390 bool TaskContext::ready()
00391 {
00392 return true;
00393 }
00394
00395 bool connectPorts(TaskContext* A, TaskContext* B) {
00396 return A->connectPorts(B);
00397 }
00398
00399 bool connectPeers(TaskContext* A, TaskContext* B) {
00400 return A->connectPeers(B);
00401 }
00402
00403 bool TaskContext::start()
00404 {
00405 if ( this->isRunning() )
00406 return false;
00407 #ifdef ORO_SIGNALLING_PORTS
00408 ports()->setupHandles();
00409 #endif
00410 return TaskCore::start();
00411 }
00412
00413 bool TaskContext::stop()
00414 {
00415 if ( !this->isRunning() )
00416 return false;
00417 if (TaskCore::stop()) {
00418 #ifdef ORO_SIGNALLING_PORTS
00419 ports()->cleanupHandles();
00420 #endif
00421 return true;
00422 }
00423 return false;
00424 }
00425
00426 void TaskContext::dataOnPort(PortInterface* port)
00427 {
00428 if ( this->dataOnPortHook(port) ) {
00429 portqueue->enqueue( port );
00430 this->getActivity()->trigger();
00431 }
00432 }
00433
00434 bool TaskContext::dataOnPortHook( base::PortInterface* ) {
00435 return this->isRunning();
00436 }
00437
00438 void TaskContext::dataOnPortCallback(InputPortInterface* port, TaskContext::SlotFunction callback) {
00439
00440 MutexLock lock(mportlock);
00441 user_callbacks[port] = callback;
00442 }
00443
00444 void TaskContext::dataOnPortRemoved(PortInterface* port) {
00445 MutexLock lock(mportlock);
00446 UserCallbacks::iterator it = user_callbacks.find(port);
00447 if (it != user_callbacks.end() ) {
00448 user_callbacks.erase(it);
00449 }
00450 }
00451
00452 void TaskContext::prepareUpdateHook()
00453 {
00454 MutexLock lock(mportlock);
00455 PortInterface* port = 0;
00456 while ( portqueue->dequeue( port ) == true ) {
00457 UserCallbacks::iterator it = user_callbacks.find(port);
00458 if (it != user_callbacks.end() )
00459 it->second(port);
00460 }
00461 }
00462 }
00463