Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "TaskContextServer.hpp"
00041 #include "TaskContextProxy.hpp"
00042 #include "corba.h"
00043 #ifdef CORBA_IS_TAO
00044 #include "TaskContextS.h"
00045 #include <orbsvcs/CosNamingC.h>
00046
00047 #include <ace/SString.h>
00048 #include "tao/TimeBaseC.h"
00049 #include "tao/Messaging/Messaging.h"
00050 #include "tao/Messaging/Messaging_RT_PolicyC.h"
00051 #else
00052 #include <omniORB4/Naming.hh>
00053 #endif
00054 #include "TaskContextC.h"
00055 #include "TaskContextI.h"
00056 #include "DataFlowI.h"
00057 #include "POAUtility.h"
00058 #include <iostream>
00059 #include <fstream>
00060
00061 #include "../../os/threads.hpp"
00062 #include "../../Activity.hpp"
00063
00064 namespace RTT
00065 {namespace corba
00066 {
00067 using namespace std;
00068
00069 std::map<TaskContext*, TaskContextServer*> TaskContextServer::servers;
00070
00071 base::ActivityInterface* TaskContextServer::orbrunner = 0;
00072
00073 bool TaskContextServer::is_shutdown = false;
00074
00075 std::map<TaskContext*, std::string> TaskContextServer::iors;
00076
00077 TaskContextServer::~TaskContextServer()
00078 {
00079 Logger::In in("~TaskContextServer()");
00080 servers.erase(mtaskcontext);
00081
00082
00083 iors.erase(mtaskcontext);
00084
00085 PortableServer::ObjectId_var oid = mpoa->servant_to_id(mtask_i.in());
00086 mpoa->deactivate_object(oid);
00087
00088 if (muse_naming) {
00089 try {
00090 CORBA::Object_var rootObj = orb->resolve_initial_references("NameService");
00091 CosNaming::NamingContext_var rootNC = CosNaming::NamingContext::_narrow(rootObj.in());
00092
00093 if (CORBA::is_nil( rootNC.in() ) ) {
00094 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' could not find CORBA Naming Service."<<endlog();
00095 } else {
00096
00097 CosNaming::Name name;
00098 name.length(2);
00099 name[0].id = CORBA::string_dup("TaskContexts");
00100 name[1].id = CORBA::string_dup( mtaskcontext->getName().c_str() );
00101 try {
00102 rootNC->unbind(name);
00103 log(Info) << "Successfully removed CTaskContext '"<<mtaskcontext->getName()<<"' from CORBA Naming Service."<<endlog();
00104 }
00105 catch( CosNaming::NamingContext::NotFound ) {
00106 log(Info) << "CTaskContext '"<< mtaskcontext->getName() << "' task was already unbound."<<endlog();
00107 }
00108 catch( ... ) {
00109 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed."<<endlog();
00110 }
00111 }
00112 } catch (...) {
00113 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed from CORBA Naming Service."<<endlog();
00114 }
00115 }
00116 }
00117
00118
00119
00120
00121 TaskContextServer::TaskContextServer(TaskContext* taskc, bool use_naming, bool require_name_service)
00122 : mtaskcontext(taskc), muse_naming(use_naming)
00123 {
00124 Logger::In in("TaskContextServer()");
00125 servers[taskc] = this;
00126 try {
00127
00128
00129 CORBA::Object_var poa_object =
00130 orb->resolve_initial_references ("RootPOA");
00131 mpoa = PortableServer::POA::_narrow(poa_object);
00132 PortableServer::POAManager_var poa_manager =
00133 mpoa->the_POAManager ();
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147 RTT_corba_CTaskContext_i* serv;
00148 mtask_i = serv = new RTT_corba_CTaskContext_i( taskc, mpoa );
00149 mtask = serv->activate_this();
00150
00151
00152 CORBA::String_var ior = orb->object_to_string( mtask.in() );
00153 iors[taskc] = std::string( ior.in() );
00154
00155 if ( use_naming ) {
00156 CORBA::Object_var rootObj;
00157 CosNaming::NamingContext_var rootNC;
00158 try {
00159 rootObj = orb->resolve_initial_references("NameService");
00160 rootNC = CosNaming::NamingContext::_narrow(rootObj);
00161 } catch (...) {}
00162
00163 if (CORBA::is_nil( rootNC ) ) {
00164 std::string err("CTaskContext '" + taskc->getName() + "' could not find CORBA Naming Service.");
00165 if (require_name_service) {
00166 servers.erase(taskc);
00167 log(Error) << err << endlog();
00168 servers.erase(taskc);
00169 throw IllegalServer(err);
00170 }
00171 else
00172 {
00173 log(Warning) << err << endlog();
00174 #ifndef ORO_NO_EMIT_CORBA_IOR
00175 log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog();
00176
00177
00178 CORBA::String_var ior = orb->object_to_string( mtask.in() );
00179 std::cerr << ior.in() <<std::endl;
00180 {
00181
00182 std::string iorname( taskc->getName());
00183 iorname += ".ior";
00184 std::ofstream file_ior( iorname.c_str() );
00185 file_ior << ior.in() <<std::endl;
00186 }
00187 #endif
00188 return;
00189 }
00190 }
00191 log(Info) << "CTaskContext '"<< taskc->getName() << "' found CORBA Naming Service."<<endlog();
00192
00193 CosNaming::Name name;
00194 name.length(1);
00195 name[0].id = CORBA::string_dup("TaskContexts");
00196 CosNaming::NamingContext_var controlNC;
00197 try {
00198 controlNC = rootNC->bind_new_context(name);
00199 }
00200 catch( CosNaming::NamingContext::AlreadyBound&) {
00201 log(Debug) << "NamingContext 'TaskContexts' already bound to CORBA Naming Service."<<endlog();
00202
00203 }
00204
00205 name.length(2);
00206 name[1].id = CORBA::string_dup( taskc->getName().c_str() );
00207 try {
00208 rootNC->bind(name, mtask );
00209 log(Info) << "Successfully added CTaskContext '"<<taskc->getName()<<"' to CORBA Naming Service."<<endlog();
00210 }
00211 catch( CosNaming::NamingContext::AlreadyBound&) {
00212 log(Warning) << "CTaskContext '"<< taskc->getName() << "' already bound to CORBA Naming Service."<<endlog();
00213 log() <<"Trying to rebind...";
00214 try {
00215 rootNC->rebind(name, mtask);
00216 } catch( ... ) {
00217 log() << " failed!"<<endlog();
00218 return;
00219 }
00220 log() << " done. New CTaskContext bound to Naming Service."<<endlog();
00221 }
00222 }
00223 else {
00224 log(Info) <<"CTaskContext '"<< taskc->getName() << "' is not using the CORBA Naming Service."<<endlog();
00225 #ifndef ORO_NO_EMIT_CORBA_IOR
00226 log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog();
00227
00228
00229 CORBA::String_var ior = orb->object_to_string( mtask.in() );
00230 std::cerr << ior.in() <<std::endl;
00231 {
00232
00233 std::string iorname( taskc->getName());
00234 iorname += ".ior";
00235 std::ofstream file_ior( iorname.c_str() );
00236 file_ior << ior.in() <<std::endl;
00237 }
00238 #endif
00239 return;
00240 }
00241 }
00242 catch (CORBA::Exception &e) {
00243 log(Error) << "CORBA exception raised!" << endlog();
00244 log() << CORBA_EXCEPTION_INFO(e) << endlog();
00245 }
00246
00247 }
00248
00249 void TaskContextServer::CleanupServers() {
00250 if ( !CORBA::is_nil(orb) && !is_shutdown) {
00251 log(Info) << "Cleaning up TaskContextServers..."<<endlog();
00252 while ( !servers.empty() ){
00253 delete servers.begin()->second;
00254
00255 }
00256 CDataFlowInterface_i::clearServants();
00257 log() << "Cleanup done."<<endlog();
00258 }
00259 }
00260
00261 void TaskContextServer::CleanupServer(TaskContext* c) {
00262 if ( !CORBA::is_nil(orb) ) {
00263 ServerMap::iterator it = servers.find(c);
00264 if ( it != servers.end() ){
00265 log(Info) << "Cleaning up TaskContextServer for "<< c->getName()<<endlog();
00266 CDataFlowInterface_i::deregisterServant(c->provides().get());
00267 delete it->second;
00268
00269 }
00270 }
00271 }
00272
00273 void TaskContextServer::ShutdownOrb(bool wait_for_completion)
00274 {
00275 Logger::In in("ShutdownOrb");
00276 DoShutdownOrb(wait_for_completion);
00277 }
00278
00279 void TaskContextServer::DoShutdownOrb(bool wait_for_completion)
00280 {
00281 if (is_shutdown) {
00282 log(Info) << "Orb already down..."<<endlog();
00283 return;
00284 }
00285 if ( CORBA::is_nil(orb) ) {
00286 log(Error) << "Orb Shutdown...failed! Orb is nil." << endlog();
00287 return;
00288 }
00289
00290 try {
00291 CleanupServers();
00292 log(Info) << "Orb Shutdown...";
00293 is_shutdown = true;
00294 if (wait_for_completion)
00295 log(Info)<<"waiting..."<<endlog();
00296 orb->shutdown( wait_for_completion );
00297 log(Info) << "done." << endlog();
00298 }
00299 catch (CORBA::Exception &e) {
00300 log(Error) << "Orb Shutdown...failed! CORBA exception raised." << endlog();
00301 log() << CORBA_EXCEPTION_INFO(e) << endlog();
00302 return;
00303 }
00304 }
00305
00306
00307 void TaskContextServer::RunOrb()
00308 {
00309 if ( CORBA::is_nil(orb) ) {
00310 log(Error) << "RunOrb...failed! Orb is nil." << endlog();
00311 return;
00312 }
00313 try {
00314 log(Info) <<"Entering orb->run()."<<endlog();
00315 orb->run();
00316 log(Info) <<"Breaking out of orb->run()."<<endlog();
00317 }
00318 catch (CORBA::Exception &e) {
00319 log(Error) << "Orb Run : CORBA exception raised!" << endlog();
00320 log() << CORBA_EXCEPTION_INFO(e) << endlog();
00321 }
00322 }
00323
00327 class OrbRunner
00328 : public Activity
00329 {
00330 public:
00331 OrbRunner(int scheduler, int priority, unsigned cpu_affinity)
00332 : Activity(scheduler, priority, cpu_affinity)
00333 {}
00334 void loop()
00335 {
00336 Logger::In in("OrbRunner");
00337 TaskContextServer::RunOrb();
00338 }
00339
00340 bool breakLoop()
00341 {
00342 return true;
00343 }
00344
00345 void finalize()
00346 {
00347 Logger::In in("OrbRunner");
00348 log(Info) <<"Safely stopped."<<endlog();
00349 }
00350 };
00351
00352 void TaskContextServer::ThreadOrb() { return ThreadOrb(ORO_SCHED_RT); }
00353 void TaskContextServer::ThreadOrb(int scheduler, int priority, unsigned cpu_affinity)
00354 {
00355 Logger::In in("ThreadOrb");
00356 if ( CORBA::is_nil(orb) ) {
00357 log(Error) << "ThreadOrb...failed! Orb is nil." << endlog();
00358 return;
00359 }
00360 if (orbrunner != 0) {
00361 log(Error) <<"Orb already running in a thread."<<endlog();
00362 } else {
00363 log(Info) <<"Starting Orb in a thread."<<endlog();
00364 orbrunner = new OrbRunner(scheduler, priority, cpu_affinity);
00365 orbrunner->start();
00366 }
00367 }
00368
00369 void TaskContextServer::DestroyOrb()
00370 {
00371 Logger::In in("DestroyOrb");
00372 if ( CORBA::is_nil(orb) ) {
00373 log(Error) << "DestroyOrb...failed! Orb is nil." << endlog();
00374 return;
00375 }
00376
00377 if (orbrunner) {
00378 orbrunner->stop();
00379 delete orbrunner;
00380 orbrunner = 0;
00381 }
00382
00383 try {
00384
00385
00386 CleanupServers();
00387 orb->destroy();
00388 rootPOA = 0;
00389 orb = 0;
00390 log(Info) <<"Orb destroyed."<<endlog();
00391 }
00392 catch (CORBA::Exception &e) {
00393 log(Error) << "Orb Destroy : CORBA exception raised!" << endlog();
00394 log() << CORBA_EXCEPTION_INFO(e) << endlog();
00395 }
00396
00397 }
00398
00399 TaskContextServer* TaskContextServer::Create(TaskContext* tc, bool use_naming, bool require_name_service) {
00400 if ( CORBA::is_nil(orb) )
00401 return 0;
00402
00403 if ( servers.count(tc) ) {
00404 log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog();
00405 return servers.find(tc)->second;
00406 }
00407
00408
00409 log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog();
00410 try {
00411 TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service);
00412 return cts;
00413 }
00414 catch( IllegalServer& is ) {
00415 cerr << is.what() << endl;
00416 }
00417 return 0;
00418 }
00419
00420 CTaskContext_ptr TaskContextServer::CreateServer(TaskContext* tc, bool use_naming, bool require_name_service) {
00421 if ( CORBA::is_nil(orb) )
00422 return CTaskContext::_nil();
00423
00424 if ( servers.count(tc) ) {
00425 log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog();
00426 return CTaskContext::_duplicate( servers.find(tc)->second->server() );
00427 }
00428
00429 for (TaskContextProxy::PMap::iterator it = TaskContextProxy::proxies.begin(); it != TaskContextProxy::proxies.end(); ++it)
00430 if ( (it->first) == tc ) {
00431 log(Debug) << "Returning server of Proxy for "<<tc->getName()<<endlog();
00432 return CTaskContext::_duplicate(it->second);
00433 }
00434
00435
00436 log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog();
00437 try {
00438 TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service);
00439 return CTaskContext::_duplicate( cts->server() );
00440 }
00441 catch( IllegalServer& is ) {
00442 cerr << is.what() << endl;
00443 }
00444 return CTaskContext::_nil();
00445 }
00446
00447
00448 corba::CTaskContext_ptr TaskContextServer::server() const
00449 {
00450
00451 return mtask.in();
00452 }
00453
00454 std::string TaskContextServer::getIOR(TaskContext* tc)
00455 {
00456 IorMap::const_iterator it = iors.find(tc);
00457 if (it != iors.end())
00458 return it->second;
00459
00460 return std::string("");
00461 }
00462
00463 }}