$search
00001 /*************************************************************************** 00002 tag: Peter Soetens Tue Dec 21 22:43:08 CET 2004 TaskContext.cxx 00003 00004 TaskContext.cxx - description 00005 ------------------- 00006 begin : Tue December 21 2004 00007 copyright : (C) 2004 Peter Soetens 00008 email : peter.soetens@mech.kuleuven.ac.be 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 00040 #include "TaskContext.hpp" 00041 #include "base/ActionInterface.hpp" 00042 #include "plugin/PluginLoader.hpp" 00043 00044 #include <string> 00045 #include <algorithm> 00046 #include <functional> 00047 #include <boost/bind.hpp> 00048 #include <boost/mem_fn.hpp> 00049 00050 #include "internal/DataSource.hpp" 00051 #include "internal/mystd.hpp" 00052 #include "internal/MWSRQueue.hpp" 00053 #include "OperationCaller.hpp" 00054 00055 #include "rtt-config.h" 00056 00057 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL) 00058 #include "extras/SequentialActivity.hpp" 00059 #elif defined(ORO_ACT_DEFAULT_ACTIVITY) 00060 #include "Activity.hpp" 00061 #endif 00062 00063 namespace RTT 00064 { 00065 00066 using namespace boost; 00067 using namespace std; 00068 using namespace detail; 00069 00070 TaskContext::TaskContext(const std::string& name, TaskState initial_state /*= Stopped*/) 00071 : TaskCore( initial_state) 00072 ,portqueue( new MWSRQueue<PortInterface*>(64) ) 00073 ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) ) 00074 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL) 00075 ,our_act( new SequentialActivity( this->engine() ) ) 00076 #elif defined(ORO_ACT_DEFAULT_ACTIVITY) 00077 ,our_act( new Activity( this->engine(), name ) ) 00078 #endif 00079 { 00080 this->setup(); 00081 } 00082 00083 TaskContext::TaskContext(const std::string& name, ExecutionEngine* parent, TaskState initial_state /*= Stopped*/ ) 00084 : TaskCore(parent, initial_state) 00085 ,portqueue( new MWSRQueue<PortInterface*>(64) ) 00086 ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) ) 00087 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL) 00088 ,our_act( parent ? 0 : new SequentialActivity( this->engine() ) ) 00089 #elif defined(ORO_ACT_DEFAULT_ACTIVITY) 00090 ,our_act( parent ? 0 : new Activity( this->engine(), name ) ) 00091 #endif 00092 { 00093 this->setup(); 00094 } 00095 00096 void TaskContext::setup() 00097 { 00098 tcservice->setOwner(this); 00099 // from Service 00100 provides()->doc("The interface of this TaskContext."); 00101 00102 this->addOperation("configure", &TaskContext::configure, this, ClientThread).doc("Configure this TaskContext (= configureHook() )."); 00103 this->addOperation("isConfigured", &TaskContext::isConfigured, this, ClientThread).doc("Is this TaskContext configured ?"); 00104 this->addOperation("start", &TaskContext::start, this, ClientThread).doc("Start this TaskContext (= startHook() + updateHook() )."); 00105 this->addOperation("activate", &TaskContext::activate, this, ClientThread).doc("Activate the Execution Engine of this TaskContext."); 00106 this->addOperation("stop", &TaskContext::stop, this, ClientThread).doc("Stop this TaskContext (= stopHook() )."); 00107 this->addOperation("isRunning", &TaskContext::isRunning, this, ClientThread).doc("Is this TaskContext started ?"); 00108 this->addOperation("getPeriod", &TaskContext::getPeriod, this, ClientThread).doc("Get the configured execution period. -1.0: no thread associated, 0.0: non periodic, > 0.0: the period."); 00109 this->addOperation("setPeriod", &TaskContext::setPeriod, this, ClientThread).doc("Set the execution period in seconds.").arg("s", "Period in seconds."); 00110 this->addOperation("getCpuAffinity", &TaskContext::getCpuAffinity, this, ClientThread).doc("Get the configured cpu affinity."); 00111 this->addOperation("setCpuAffinity", &TaskContext::setCpuAffinity, this, ClientThread).doc("Set the cpu affinity.").arg("cpu", "Cpu mask."); 00112 this->addOperation("isActive", &TaskContext::isActive, this, ClientThread).doc("Is the Execution Engine of this TaskContext active ?"); 00113 this->addOperation("inFatalError", &TaskContext::inFatalError, this, ClientThread).doc("Check if this TaskContext is in the FatalError state."); 00114 this->addOperation("error", &TaskContext::error, this, ClientThread).doc("Enter the RunTimeError state (= errorHook() )."); 00115 this->addOperation("inRunTimeError", &TaskContext::inRunTimeError, this, ClientThread).doc("Check if this TaskContext is in the RunTimeError state."); 00116 this->addOperation("cleanup", &TaskContext::cleanup, this, ClientThread).doc("Reset this TaskContext to the PreOperational state ( =cleanupHook() )."); 00117 this->addOperation("update", &TaskContext::update, this, ClientThread).doc("Execute (call) the update method directly.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task."); 00118 00119 this->addOperation("trigger", &TaskContext::trigger, this, ClientThread).doc("Trigger the update method for execution in the thread of this task.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task."); 00120 this->addOperation("loadService", &TaskContext::loadService, this, ClientThread).doc("Loads a service known to RTT into this component.").arg("service_name","The name with which the service is registered by in the PluginLoader."); 00121 // activity runs from the start. 00122 if (our_act) 00123 our_act->start(); 00124 } 00125 00126 TaskContext::~TaskContext() 00127 { 00128 if (our_act) 00129 our_act->stop(); 00130 // We don't call stop() or cleanup() here since this is 00131 // the responsibility of the subclass. Calling these functions 00132 // here would only lead to calling invalid virtual functions. 00133 // [Rule no 1: Don't call virtual functions in a destructor.] 00134 // [Rule no 2: Don't call virtual functions in a constructor.] 00135 tcservice->clear(); 00136 00137 delete tcrequests; 00138 00139 // remove from all users. 00140 while( !musers.empty() ) { 00141 musers.front()->removePeer(this); 00142 } 00143 // since we are destroyed, be sure that the peer no longer 00144 // has a 'user' pointer to us. 00145 while ( !_task_map.empty() ) { 00146 _task_map.begin()->second->removeUser(this); 00147 _task_map.erase( _task_map.begin() ); 00148 } 00149 // Do not call this->disconnect() !!! 00150 // Ports are probably already destructed by user code. 00151 delete portqueue; 00152 } 00153 00154 bool TaskContext::connectPorts( TaskContext* peer ) 00155 { 00156 bool failure = false; 00157 const std::string& location = this->getName(); 00158 Logger::In in( location.c_str() ); 00159 00160 DataFlowInterface::Ports myports = this->ports()->getPorts(); 00161 for (DataFlowInterface::Ports::iterator it = myports.begin(); 00162 it != myports.end(); 00163 ++it) { 00164 00165 // Then try to get the peer port's connection 00166 PortInterface* peerport = peer->ports()->getPort( (*it)->getName() ); 00167 if ( !peerport ) { 00168 log(Debug)<< "Peer Task "<<peer->getName() <<" has no Port " << (*it)->getName() << endlog(); 00169 continue; 00170 } 00171 00172 // Skip if they have the same type 00173 if((dynamic_cast<OutputPortInterface*>(*it) && dynamic_cast<OutputPortInterface*>(peerport)) || 00174 (dynamic_cast<InputPortInterface*>(*it) && dynamic_cast<InputPortInterface*>(peerport))) 00175 { 00176 log(Debug)<< (*it)->getName() << " and " << peerport->getName() << " have the same type" << endlog(); 00177 continue; 00178 } 00179 00180 // Try to find a way to connect them 00181 if ( !(*it)->connectTo( peerport ) ) { 00182 log(Debug)<< "Data flow incompatible between ports " 00183 << getName() << "." << (*it)->getName() << " and " 00184 << peer->getName() << "." << (*it)->getName() << endlog(); 00185 failure = true; 00186 } 00187 } 00188 return !failure; 00189 } 00190 00191 bool TaskContext::connectServices( TaskContext* peer ) 00192 { 00193 bool failure = false; 00194 const std::string& location = this->getName(); 00195 Logger::In in( location.c_str() ); 00196 00197 vector<string> myreqs = this->requires()->getRequesterNames(); 00198 vector<string> peerreqs = peer->requires()->getRequesterNames(); 00199 00200 this->requires()->connectTo( peer->provides() ); 00201 for (vector<string>::iterator it = myreqs.begin(); 00202 it != myreqs.end(); 00203 ++it) { 00204 ServiceRequester* sr = this->requires(*it); 00205 if ( !sr->ready() ) { 00206 if (peer->provides()->hasService( *it )) 00207 sr->connectTo( peer->provides(*it) ); 00208 else { 00209 log(Debug)<< "Peer Task "<<peer->getName() <<" provides no Service " << *it << endlog(); 00210 } 00211 } 00212 } 00213 00214 peer->requires()->connectTo( this->provides() ); 00215 for (vector<string>::iterator it = peerreqs.begin(); 00216 it != peerreqs.end(); 00217 ++it) { 00218 ServiceRequester* sr = peer->requires(*it); 00219 if ( !sr->ready() ) { 00220 if (this->provides()->hasService(*it)) 00221 sr->connectTo( this->provides(*it) ); 00222 else 00223 log(Debug)<< "This Task provides no Service " << *it << " for peer Task "<<peer->getName() <<"."<< endlog(); 00224 } 00225 } 00226 return !failure; 00227 } 00228 00229 bool TaskContext::prepareProvide(const std::string& name) { 00230 return tcservice->hasService(name) || plugin::PluginLoader::Instance()->loadService(name, this); 00231 } 00232 00233 bool TaskContext::loadService(const std::string& service_name) { 00234 if ( provides()->hasService(service_name)) 00235 return true; 00236 return PluginLoader::Instance()->loadService(service_name, this); 00237 } 00238 00239 void TaskContext::addUser( TaskContext* peer ) 00240 { 00241 if (peer) 00242 musers.push_back(peer); 00243 } 00244 00245 void TaskContext::removeUser( TaskContext* peer ) 00246 { 00247 Users::iterator it = find(musers.begin(), musers.end(), peer); 00248 if ( it != musers.end() ) 00249 musers.erase(it); 00250 } 00251 00252 bool TaskContext::addPeer( TaskContext* peer, std::string alias ) 00253 { 00254 if ( alias.empty() ) 00255 alias = peer->getName(); 00256 if ( !peer || _task_map.count( alias ) != 0 ) 00257 return false; 00258 _task_map[ alias ] = peer; 00259 peer->addUser( this ); 00260 return true; 00261 } 00262 00263 void TaskContext::removePeer( const std::string& name ) 00264 { 00265 PeerMap::iterator it = _task_map.find( name ); 00266 if ( _task_map.end() != it ) { 00267 it->second->removeUser( this ); 00268 _task_map.erase( _task_map.find( name ) ); 00269 } 00270 } 00271 00272 void TaskContext::removePeer( TaskContext* peer ) 00273 { 00274 for( PeerMap::iterator it = _task_map.begin(); it != _task_map.end(); ++it) 00275 if ( it->second == peer ) { 00276 peer->removeUser( this ); 00277 _task_map.erase( it ); 00278 return; 00279 } 00280 } 00281 00282 bool TaskContext::connectPeers( TaskContext* peer ) 00283 { 00284 if ( _task_map.count( peer->getName() ) != 0 00285 || peer->hasPeer( this->getName() ) ) 00286 return false; 00287 this->addPeer ( peer ); 00288 peer->addPeer ( this ); 00289 return true; 00290 } 00291 00292 void TaskContext::disconnect() { 00293 Logger::In in( this->getName().c_str() ); 00294 // disconnect all our ports 00295 DataFlowInterface::Ports myports = this->ports()->getPorts(); 00296 for (DataFlowInterface::Ports::iterator it = myports.begin(); 00297 it != myports.end(); 00298 ++it) { 00299 (*it)->disconnect(); 00300 } 00301 00302 // remove from all users. 00303 while( !musers.empty() ) { 00304 musers.front()->removePeer(this); 00305 } 00306 00307 while ( !_task_map.empty() ) { 00308 _task_map.begin()->second->removeUser(this); 00309 _task_map.erase( _task_map.begin() ); 00310 } 00311 } 00312 00313 void TaskContext::disconnectPeers( const std::string& name ) 00314 { 00315 if ( _task_map.end() != _task_map.find( name ) ) { 00316 TaskContext* peer = _task_map.find(name)->second; 00317 this->removePeer(peer); 00318 peer->removePeer(this); 00319 } 00320 } 00321 00322 std::vector<std::string> TaskContext::getPeerList() const 00323 { 00324 std::vector<std::string> res; 00325 std::transform(_task_map.begin(), _task_map.end(), 00326 std::back_inserter( res ), 00327 select1st<PeerMap::value_type>() ); 00328 return res; 00329 } 00330 00331 bool TaskContext::hasPeer( const std::string& peer_name ) const 00332 { 00333 return _task_map.count( peer_name ) == 1; 00334 } 00335 00336 TaskContext* TaskContext::getPeer(const std::string& peer_name ) const 00337 { 00338 if (this->hasPeer( peer_name ) ) 00339 return _task_map.find(peer_name)->second; 00340 return 0; 00341 } 00342 00343 bool TaskContext::setActivity(ActivityInterface* new_act) 00344 { 00345 if (this->isRunning()) 00346 return false; 00347 if ( new_act == 0) { 00348 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL) 00349 new_act = new SequentialActivity(); 00350 #elseif defined(ORO_ACT_DEFAULT_ACTIVITY) 00351 new_act = new Activity(); 00352 #endif 00353 } 00354 new_act->stop(); 00355 our_act->stop(); 00356 new_act->run( this->engine() ); 00357 our_act = ActivityInterface::shared_ptr( new_act ); 00358 our_act->start(); 00359 return true; 00360 } 00361 00362 void TaskContext::forceActivity(ActivityInterface* new_act) 00363 { 00364 if (!new_act) 00365 return; 00366 new_act->stop(); 00367 our_act->stop(); 00368 our_act.reset( new_act ); 00369 our_act->run( this->engine() ); 00370 our_act->start(); 00371 } 00372 00373 ActivityInterface* TaskContext::getActivity() 00374 { 00375 if (this->engine()->getActivity() != our_act.get() ) 00376 return this->engine()->getActivity(); 00377 return our_act.get(); 00378 } 00379 00380 void TaskContext::clear() 00381 { 00382 tcservice->clear(); 00383 } 00384 00385 bool TaskContext::ready() 00386 { 00387 return true; 00388 } 00389 00390 bool connectPorts(TaskContext* A, TaskContext* B) { 00391 return A->connectPorts(B); 00392 } 00393 00394 bool connectPeers(TaskContext* A, TaskContext* B) { 00395 return A->connectPeers(B); 00396 } 00397 00398 bool TaskContext::start() 00399 { 00400 if ( this->isRunning() ) 00401 return false; 00402 #ifdef ORO_SIGNALLING_PORTS 00403 ports()->setupHandles(); 00404 #endif 00405 return TaskCore::start(); // calls startHook() 00406 } 00407 00408 bool TaskContext::stop() 00409 { 00410 if ( !this->isRunning() ) 00411 return false; 00412 if (TaskCore::stop()) { // calls stopHook() 00413 #ifdef ORO_SIGNALLING_PORTS 00414 ports()->cleanupHandles(); 00415 #endif 00416 return true; 00417 } 00418 return false; 00419 } 00420 00421 void TaskContext::dataOnPort(PortInterface* port) 00422 { 00423 portqueue->enqueue( port ); 00424 this->getActivity()->trigger(); 00425 } 00426 00427 void TaskContext::dataOnPortCallback(InputPortInterface* port, TaskContext::SlotFunction callback) { 00428 // user_callbacks will only be emitted from updateHook(). 00429 MutexLock lock(mportlock); 00430 user_callbacks[port] = callback; 00431 } 00432 00433 void TaskContext::dataOnPortRemoved(PortInterface* port) { 00434 MutexLock lock(mportlock); 00435 UserCallbacks::iterator it = user_callbacks.find(port); 00436 if (it != user_callbacks.end() ) { 00437 user_callbacks.erase(it); 00438 } 00439 } 00440 00441 void TaskContext::prepareUpdateHook() 00442 { 00443 MutexLock lock(mportlock); 00444 PortInterface* port = 0; 00445 while ( portqueue->dequeue( port ) == true ) { 00446 UserCallbacks::iterator it = user_callbacks.find(port); 00447 if (it != user_callbacks.end() ) 00448 it->second(port); // fire the user callback 00449 } 00450 } 00451 } 00452