00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "TaskContext.hpp"
00041 #include "base/ActionInterface.hpp"
00042 #include "plugin/PluginLoader.hpp"
00043
00044 #include <string>
00045 #include <algorithm>
00046 #include <functional>
00047 #include <boost/bind.hpp>
00048 #include <boost/mem_fn.hpp>
00049
00050 #include "internal/DataSource.hpp"
00051 #include "internal/mystd.hpp"
00052 #include "internal/MWSRQueue.hpp"
00053 #include "OperationCaller.hpp"
00054
00055 #include "rtt-config.h"
00056
00057 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00058 #include "extras/SequentialActivity.hpp"
00059 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
00060 #include "Activity.hpp"
00061 #endif
00062
00063 namespace RTT
00064 {
00065
00066 using namespace boost;
00067 using namespace std;
00068 using namespace detail;
00069
00070 TaskContext::TaskContext(const std::string& name, TaskState initial_state )
00071 : TaskCore( initial_state)
00072 ,portqueue( new MWSRQueue<PortInterface*>(64) )
00073 ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) )
00074 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00075 ,our_act( new SequentialActivity( this->engine() ) )
00076 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
00077 ,our_act( new Activity( this->engine(), name ) )
00078 #endif
00079 {
00080 this->setup();
00081 }
00082
00083 TaskContext::TaskContext(const std::string& name, ExecutionEngine* parent, TaskState initial_state )
00084 : TaskCore(parent, initial_state)
00085 ,portqueue( new MWSRQueue<PortInterface*>(64) )
00086 ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) )
00087 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00088 ,our_act( parent ? 0 : new SequentialActivity( this->engine() ) )
00089 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
00090 ,our_act( parent ? 0 : new Activity( this->engine(), name ) )
00091 #endif
00092 {
00093 this->setup();
00094 }
00095
00096 void TaskContext::setup()
00097 {
00098 tcservice->setOwner(this);
00099
00100 provides()->doc("The interface of this TaskContext.");
00101
00102 this->addOperation("configure", &TaskContext::configure, this, ClientThread).doc("Configure this TaskContext (= configureHook() ).");
00103 this->addOperation("isConfigured", &TaskContext::isConfigured, this, ClientThread).doc("Is this TaskContext configured ?");
00104 this->addOperation("start", &TaskContext::start, this, ClientThread).doc("Start this TaskContext (= startHook() + updateHook() ).");
00105 this->addOperation("activate", &TaskContext::activate, this, ClientThread).doc("Activate the Execution Engine of this TaskContext.");
00106 this->addOperation("stop", &TaskContext::stop, this, ClientThread).doc("Stop this TaskContext (= stopHook() ).");
00107 this->addOperation("isRunning", &TaskContext::isRunning, this, ClientThread).doc("Is this TaskContext started ?");
00108 this->addOperation("getPeriod", &TaskContext::getPeriod, this, ClientThread).doc("Get the configured execution period. -1.0: no thread associated, 0.0: non periodic, > 0.0: the period.");
00109 this->addOperation("setPeriod", &TaskContext::setPeriod, this, ClientThread).doc("Set the execution period in seconds.").arg("s", "Period in seconds.");
00110 this->addOperation("getCpuAffinity", &TaskContext::getCpuAffinity, this, ClientThread).doc("Get the configured cpu affinity.");
00111 this->addOperation("setCpuAffinity", &TaskContext::setCpuAffinity, this, ClientThread).doc("Set the cpu affinity.").arg("cpu", "Cpu mask.");
00112 this->addOperation("isActive", &TaskContext::isActive, this, ClientThread).doc("Is the Execution Engine of this TaskContext active ?");
00113 this->addOperation("inFatalError", &TaskContext::inFatalError, this, ClientThread).doc("Check if this TaskContext is in the FatalError state.");
00114 this->addOperation("error", &TaskContext::error, this, ClientThread).doc("Enter the RunTimeError state (= errorHook() ).");
00115 this->addOperation("inRunTimeError", &TaskContext::inRunTimeError, this, ClientThread).doc("Check if this TaskContext is in the RunTimeError state.");
00116 this->addOperation("cleanup", &TaskContext::cleanup, this, ClientThread).doc("Reset this TaskContext to the PreOperational state ( =cleanupHook() ).");
00117 this->addOperation("update", &TaskContext::update, this, ClientThread).doc("Execute (call) the update method directly.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
00118
00119 this->addOperation("trigger", &TaskContext::trigger, this, ClientThread).doc("Trigger the update method for execution in the thread of this task.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
00120 this->addOperation("loadService", &TaskContext::loadService, this, ClientThread).doc("Loads a service known to RTT into this component.").arg("service_name","The name with which the service is registered by in the PluginLoader.");
00121
00122 if (our_act)
00123 our_act->start();
00124 }
00125
00126 TaskContext::~TaskContext()
00127 {
00128 if (our_act)
00129 our_act->stop();
00130
00131
00132
00133
00134
00135 tcservice->clear();
00136
00137 delete tcrequests;
00138
00139
00140 while( !musers.empty() ) {
00141 musers.front()->removePeer(this);
00142 }
00143
00144
00145 while ( !_task_map.empty() ) {
00146 _task_map.begin()->second->removeUser(this);
00147 _task_map.erase( _task_map.begin() );
00148 }
00149
00150
00151 delete portqueue;
00152 }
00153
00154 bool TaskContext::connectPorts( TaskContext* peer )
00155 {
00156 bool failure = false;
00157 const std::string& location = this->getName();
00158 Logger::In in( location.c_str() );
00159
00160 DataFlowInterface::Ports myports = this->ports()->getPorts();
00161 for (DataFlowInterface::Ports::iterator it = myports.begin();
00162 it != myports.end();
00163 ++it) {
00164
00165
00166 PortInterface* peerport = peer->ports()->getPort( (*it)->getName() );
00167 if ( !peerport ) {
00168 log(Debug)<< "Peer Task "<<peer->getName() <<" has no Port " << (*it)->getName() << endlog();
00169 continue;
00170 }
00171
00172
00173 if((dynamic_cast<OutputPortInterface*>(*it) && dynamic_cast<OutputPortInterface*>(peerport)) ||
00174 (dynamic_cast<InputPortInterface*>(*it) && dynamic_cast<InputPortInterface*>(peerport)))
00175 {
00176 log(Debug)<< (*it)->getName() << " and " << peerport->getName() << " have the same type" << endlog();
00177 continue;
00178 }
00179
00180
00181 if ( !(*it)->connectTo( peerport ) ) {
00182 log(Debug)<< "Data flow incompatible between ports "
00183 << getName() << "." << (*it)->getName() << " and "
00184 << peer->getName() << "." << (*it)->getName() << endlog();
00185 failure = true;
00186 }
00187 }
00188 return !failure;
00189 }
00190
00191 bool TaskContext::connectServices( TaskContext* peer )
00192 {
00193 bool failure = false;
00194 const std::string& location = this->getName();
00195 Logger::In in( location.c_str() );
00196
00197 vector<string> myreqs = this->requires()->getRequesterNames();
00198 vector<string> peerreqs = peer->requires()->getRequesterNames();
00199
00200 this->requires()->connectTo( peer->provides() );
00201 for (vector<string>::iterator it = myreqs.begin();
00202 it != myreqs.end();
00203 ++it) {
00204 ServiceRequester* sr = this->requires(*it);
00205 if ( !sr->ready() ) {
00206 if (peer->provides()->hasService( *it ))
00207 sr->connectTo( peer->provides(*it) );
00208 else {
00209 log(Debug)<< "Peer Task "<<peer->getName() <<" provides no Service " << *it << endlog();
00210 }
00211 }
00212 }
00213
00214 peer->requires()->connectTo( this->provides() );
00215 for (vector<string>::iterator it = peerreqs.begin();
00216 it != peerreqs.end();
00217 ++it) {
00218 ServiceRequester* sr = peer->requires(*it);
00219 if ( !sr->ready() ) {
00220 if (this->provides()->hasService(*it))
00221 sr->connectTo( this->provides(*it) );
00222 else
00223 log(Debug)<< "This Task provides no Service " << *it << " for peer Task "<<peer->getName() <<"."<< endlog();
00224 }
00225 }
00226 return !failure;
00227 }
00228
00229 bool TaskContext::prepareProvide(const std::string& name) {
00230 return tcservice->hasService(name) || plugin::PluginLoader::Instance()->loadService(name, this);
00231 }
00232
00233 bool TaskContext::loadService(const std::string& service_name) {
00234 if ( provides()->hasService(service_name))
00235 return true;
00236 return PluginLoader::Instance()->loadService(service_name, this);
00237 }
00238
00239 void TaskContext::addUser( TaskContext* peer )
00240 {
00241 if (peer)
00242 musers.push_back(peer);
00243 }
00244
00245 void TaskContext::removeUser( TaskContext* peer )
00246 {
00247 Users::iterator it = find(musers.begin(), musers.end(), peer);
00248 if ( it != musers.end() )
00249 musers.erase(it);
00250 }
00251
00252 bool TaskContext::addPeer( TaskContext* peer, std::string alias )
00253 {
00254 if ( alias.empty() )
00255 alias = peer->getName();
00256 if ( !peer || _task_map.count( alias ) != 0 )
00257 return false;
00258 _task_map[ alias ] = peer;
00259 peer->addUser( this );
00260 return true;
00261 }
00262
00263 void TaskContext::removePeer( const std::string& name )
00264 {
00265 PeerMap::iterator it = _task_map.find( name );
00266 if ( _task_map.end() != it ) {
00267 it->second->removeUser( this );
00268 _task_map.erase( _task_map.find( name ) );
00269 }
00270 }
00271
00272 void TaskContext::removePeer( TaskContext* peer )
00273 {
00274 for( PeerMap::iterator it = _task_map.begin(); it != _task_map.end(); ++it)
00275 if ( it->second == peer ) {
00276 peer->removeUser( this );
00277 _task_map.erase( it );
00278 return;
00279 }
00280 }
00281
00282 bool TaskContext::connectPeers( TaskContext* peer )
00283 {
00284 if ( _task_map.count( peer->getName() ) != 0
00285 || peer->hasPeer( this->getName() ) )
00286 return false;
00287 this->addPeer ( peer );
00288 peer->addPeer ( this );
00289 return true;
00290 }
00291
00292 void TaskContext::disconnect() {
00293 Logger::In in( this->getName().c_str() );
00294
00295 DataFlowInterface::Ports myports = this->ports()->getPorts();
00296 for (DataFlowInterface::Ports::iterator it = myports.begin();
00297 it != myports.end();
00298 ++it) {
00299 (*it)->disconnect();
00300 }
00301
00302
00303 while( !musers.empty() ) {
00304 musers.front()->removePeer(this);
00305 }
00306
00307 while ( !_task_map.empty() ) {
00308 _task_map.begin()->second->removeUser(this);
00309 _task_map.erase( _task_map.begin() );
00310 }
00311 }
00312
00313 void TaskContext::disconnectPeers( const std::string& name )
00314 {
00315 if ( _task_map.end() != _task_map.find( name ) ) {
00316 TaskContext* peer = _task_map.find(name)->second;
00317 this->removePeer(peer);
00318 peer->removePeer(this);
00319 }
00320 }
00321
00322 std::vector<std::string> TaskContext::getPeerList() const
00323 {
00324 std::vector<std::string> res;
00325 std::transform(_task_map.begin(), _task_map.end(),
00326 std::back_inserter( res ),
00327 select1st<PeerMap::value_type>() );
00328 return res;
00329 }
00330
00331 bool TaskContext::hasPeer( const std::string& peer_name ) const
00332 {
00333 return _task_map.count( peer_name ) == 1;
00334 }
00335
00336 TaskContext* TaskContext::getPeer(const std::string& peer_name ) const
00337 {
00338 if (this->hasPeer( peer_name ) )
00339 return _task_map.find(peer_name)->second;
00340 return 0;
00341 }
00342
00343 bool TaskContext::setActivity(ActivityInterface* new_act)
00344 {
00345 if (this->isRunning())
00346 return false;
00347 if ( new_act == 0) {
00348 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00349 new_act = new SequentialActivity();
00350 #elseif defined(ORO_ACT_DEFAULT_ACTIVITY)
00351 new_act = new Activity();
00352 #endif
00353 }
00354 new_act->stop();
00355 our_act->stop();
00356 new_act->run( this->engine() );
00357 our_act = ActivityInterface::shared_ptr( new_act );
00358 our_act->start();
00359 return true;
00360 }
00361
00362 void TaskContext::forceActivity(ActivityInterface* new_act)
00363 {
00364 if (!new_act)
00365 return;
00366 new_act->stop();
00367 our_act->stop();
00368 our_act.reset( new_act );
00369 our_act->run( this->engine() );
00370 our_act->start();
00371 }
00372
00373 ActivityInterface* TaskContext::getActivity()
00374 {
00375 if (this->engine()->getActivity() != our_act.get() )
00376 return this->engine()->getActivity();
00377 return our_act.get();
00378 }
00379
00380 void TaskContext::clear()
00381 {
00382 tcservice->clear();
00383 }
00384
00385 bool TaskContext::ready()
00386 {
00387 return true;
00388 }
00389
00390 bool connectPorts(TaskContext* A, TaskContext* B) {
00391 return A->connectPorts(B);
00392 }
00393
00394 bool connectPeers(TaskContext* A, TaskContext* B) {
00395 return A->connectPeers(B);
00396 }
00397
00398 bool TaskContext::start()
00399 {
00400 if ( this->isRunning() )
00401 return false;
00402 #ifdef ORO_SIGNALLING_PORTS
00403 ports()->setupHandles();
00404 #endif
00405 return TaskCore::start();
00406 }
00407
00408 bool TaskContext::stop()
00409 {
00410 if ( !this->isRunning() )
00411 return false;
00412 if (TaskCore::stop()) {
00413 #ifdef ORO_SIGNALLING_PORTS
00414 ports()->cleanupHandles();
00415 #endif
00416 return true;
00417 }
00418 return false;
00419 }
00420
00421 void TaskContext::dataOnPort(PortInterface* port)
00422 {
00423 portqueue->enqueue( port );
00424 this->getActivity()->trigger();
00425 }
00426
00427 void TaskContext::dataOnPortCallback(InputPortInterface* port, TaskContext::SlotFunction callback) {
00428
00429 MutexLock lock(mportlock);
00430 user_callbacks[port] = callback;
00431 }
00432
00433 void TaskContext::dataOnPortRemoved(PortInterface* port) {
00434 MutexLock lock(mportlock);
00435 UserCallbacks::iterator it = user_callbacks.find(port);
00436 if (it != user_callbacks.end() ) {
00437 user_callbacks.erase(it);
00438 }
00439 }
00440
00441 void TaskContext::prepareUpdateHook()
00442 {
00443 MutexLock lock(mportlock);
00444 PortInterface* port = 0;
00445 while ( portqueue->dequeue( port ) == true ) {
00446 UserCallbacks::iterator it = user_callbacks.find(port);
00447 if (it != user_callbacks.end() )
00448 it->second(port);
00449 }
00450 }
00451 }
00452