Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "TaskContextServer.hpp"
00041 #include "TaskContextProxy.hpp"
00042 #include "corba.h"
00043 #ifdef CORBA_IS_TAO
00044 #include "TaskContextS.h"
00045 #include <orbsvcs/CosNamingC.h>
00046
00047 #include <ace/SString.h>
00048 #include "tao/TimeBaseC.h"
00049 #include "tao/Messaging/Messaging.h"
00050 #include "tao/Messaging/Messaging_RT_PolicyC.h"
00051 #else
00052 #include <omniORB4/Naming.hh>
00053 #endif
00054 #include "TaskContextC.h"
00055 #include "TaskContextI.h"
00056 #include "DataFlowI.h"
00057 #include "POAUtility.h"
00058 #include <iostream>
00059 #include <fstream>
00060
00061 #include "../../os/threads.hpp"
00062 #include "../../Activity.hpp"
00063
00064 namespace RTT
00065 {namespace corba
00066 {
00067 using namespace std;
00068
00069 std::map<TaskContext*, TaskContextServer*> TaskContextServer::servers;
00070
00071 base::ActivityInterface* TaskContextServer::orbrunner = 0;
00072
00073 bool TaskContextServer::is_shutdown = false;
00074
00075 std::map<TaskContext*, std::string> TaskContextServer::iors;
00076
00077 TaskContextServer::~TaskContextServer()
00078 {
00079 Logger::In in("~TaskContextServer()");
00080 servers.erase(mtaskcontext);
00081
00082
00083 iors.erase(mtaskcontext);
00084
00085 PortableServer::ObjectId_var oid = mpoa->servant_to_id(mtask_i.in());
00086 mpoa->deactivate_object(oid);
00087
00088 if (muse_naming) {
00089 try {
00090 CORBA::Object_var rootObj = orb->resolve_initial_references("NameService");
00091 CosNaming::NamingContext_var rootNC = CosNaming::NamingContext::_narrow(rootObj.in());
00092
00093 if (CORBA::is_nil( rootNC.in() ) ) {
00094 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' could not find CORBA Naming Service."<<endlog();
00095 } else {
00096
00097 CosNaming::Name name;
00098 name.length(2);
00099 name[0].id = CORBA::string_dup("TaskContexts");
00100 name[1].id = CORBA::string_dup( mtaskcontext->getName().c_str() );
00101 try {
00102 rootNC->unbind(name);
00103 log(Info) << "Successfully removed CTaskContext '"<<mtaskcontext->getName()<<"' from CORBA Naming Service."<<endlog();
00104 }
00105 catch( CosNaming::NamingContext::NotFound ) {
00106 log(Info) << "CTaskContext '"<< mtaskcontext->getName() << "' task was already unbound."<<endlog();
00107 }
00108 catch( ... ) {
00109 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed."<<endlog();
00110 }
00111 }
00112 } catch (...) {
00113 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed from CORBA Naming Service."<<endlog();
00114 }
00115 }
00116 }
00117
00118
00119
00120
00121 TaskContextServer::TaskContextServer(TaskContext* taskc, bool use_naming, bool require_name_service)
00122 : mtaskcontext(taskc), muse_naming(use_naming)
00123 {
00124 Logger::In in("TaskContextServer()");
00125 servers[taskc] = this;
00126 try {
00127
00128
00129 CORBA::Object_var poa_object =
00130 orb->resolve_initial_references ("RootPOA");
00131 mpoa = PortableServer::POA::_narrow(poa_object);
00132 PortableServer::POAManager_var poa_manager =
00133 mpoa->the_POAManager ();
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147 RTT_corba_CTaskContext_i* serv;
00148 mtask_i = serv = new RTT_corba_CTaskContext_i( taskc, mpoa );
00149 mtask = serv->activate_this();
00150
00151
00152 CORBA::String_var ior = orb->object_to_string( mtask.in() );
00153 iors[taskc] = std::string( ior.in() );
00154
00155 if ( use_naming ) {
00156 CORBA::Object_var rootObj;
00157 CosNaming::NamingContext_var rootNC;
00158 try {
00159 rootObj = orb->resolve_initial_references("NameService");
00160 rootNC = CosNaming::NamingContext::_narrow(rootObj);
00161 } catch (...) {}
00162
00163 if (CORBA::is_nil( rootNC ) ) {
00164 std::string err("CTaskContext '" + taskc->getName() + "' could not find CORBA Naming Service.");
00165 if (require_name_service) {
00166 servers.erase(taskc);
00167 log(Error) << err << endlog();
00168 servers.erase(taskc);
00169 throw IllegalServer(err);
00170 }
00171 else
00172 {
00173 log(Warning) << err << endlog();
00174 log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog();
00175
00176
00177 CORBA::String_var ior = orb->object_to_string( mtask.in() );
00178 std::cerr << ior.in() <<std::endl;
00179 {
00180
00181 std::string iorname( taskc->getName());
00182 iorname += ".ior";
00183 std::ofstream file_ior( iorname.c_str() );
00184 file_ior << ior.in() <<std::endl;
00185 }
00186 return;
00187 }
00188 }
00189 log(Info) << "CTaskContext '"<< taskc->getName() << "' found CORBA Naming Service."<<endlog();
00190
00191 CosNaming::Name name;
00192 name.length(1);
00193 name[0].id = CORBA::string_dup("TaskContexts");
00194 CosNaming::NamingContext_var controlNC;
00195 try {
00196 controlNC = rootNC->bind_new_context(name);
00197 }
00198 catch( CosNaming::NamingContext::AlreadyBound&) {
00199 log(Debug) << "NamingContext 'TaskContexts' already bound to CORBA Naming Service."<<endlog();
00200
00201 }
00202
00203 name.length(2);
00204 name[1].id = CORBA::string_dup( taskc->getName().c_str() );
00205 try {
00206 rootNC->bind(name, mtask );
00207 log(Info) << "Successfully added CTaskContext '"<<taskc->getName()<<"' to CORBA Naming Service."<<endlog();
00208 }
00209 catch( CosNaming::NamingContext::AlreadyBound&) {
00210 log(Warning) << "CTaskContext '"<< taskc->getName() << "' already bound to CORBA Naming Service."<<endlog();
00211 log() <<"Trying to rebind...";
00212 try {
00213 rootNC->rebind(name, mtask);
00214 } catch( ... ) {
00215 log() << " failed!"<<endlog();
00216 return;
00217 }
00218 log() << " done. New CTaskContext bound to Naming Service."<<endlog();
00219 }
00220 }
00221 else {
00222 log(Info) <<"CTaskContext '"<< taskc->getName() << "' is not using the CORBA Naming Service."<<endlog();
00223 log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog();
00224
00225
00226 CORBA::String_var ior = orb->object_to_string( mtask.in() );
00227 std::cerr << ior.in() <<std::endl;
00228 {
00229
00230 std::string iorname( taskc->getName());
00231 iorname += ".ior";
00232 std::ofstream file_ior( iorname.c_str() );
00233 file_ior << ior.in() <<std::endl;
00234 }
00235 return;
00236 }
00237 }
00238 catch (CORBA::Exception &e) {
00239 log(Error) << "CORBA exception raised!" << endlog();
00240 log() << CORBA_EXCEPTION_INFO(e) << endlog();
00241 }
00242
00243 }
00244
00245 void TaskContextServer::CleanupServers() {
00246 if ( !CORBA::is_nil(orb) && !is_shutdown) {
00247 log(Info) << "Cleaning up TaskContextServers..."<<endlog();
00248 while ( !servers.empty() ){
00249 delete servers.begin()->second;
00250
00251 }
00252 CDataFlowInterface_i::clearServants();
00253 log() << "Cleanup done."<<endlog();
00254 }
00255 }
00256
00257 void TaskContextServer::CleanupServer(TaskContext* c) {
00258 if ( !CORBA::is_nil(orb) ) {
00259 ServerMap::iterator it = servers.find(c);
00260 if ( it != servers.end() ){
00261 log(Info) << "Cleaning up TaskContextServer for "<< c->getName()<<endlog();
00262 delete it->second;
00263
00264 }
00265 }
00266 }
00267
00268 void TaskContextServer::ShutdownOrb(bool wait_for_completion)
00269 {
00270 Logger::In in("ShutdownOrb");
00271 DoShutdownOrb(wait_for_completion);
00272 }
00273
00274 void TaskContextServer::DoShutdownOrb(bool wait_for_completion)
00275 {
00276 if (is_shutdown) {
00277 log(Info) << "Orb already down..."<<endlog();
00278 return;
00279 }
00280 if ( CORBA::is_nil(orb) ) {
00281 log(Error) << "Orb Shutdown...failed! Orb is nil." << endlog();
00282 return;
00283 }
00284
00285 try {
00286 CleanupServers();
00287 log(Info) << "Orb Shutdown...";
00288 is_shutdown = true;
00289 if (wait_for_completion)
00290 log(Info)<<"waiting..."<<endlog();
00291 orb->shutdown( wait_for_completion );
00292 log(Info) << "done." << endlog();
00293 }
00294 catch (CORBA::Exception &e) {
00295 log(Error) << "Orb Shutdown...failed! CORBA exception raised." << endlog();
00296 log() << CORBA_EXCEPTION_INFO(e) << endlog();
00297 return;
00298 }
00299 }
00300
00301
00302 void TaskContextServer::RunOrb()
00303 {
00304 if ( CORBA::is_nil(orb) ) {
00305 log(Error) << "RunOrb...failed! Orb is nil." << endlog();
00306 return;
00307 }
00308 try {
00309 log(Info) <<"Entering orb->run()."<<endlog();
00310 orb->run();
00311 log(Info) <<"Breaking out of orb->run()."<<endlog();
00312 }
00313 catch (CORBA::Exception &e) {
00314 log(Error) << "Orb Run : CORBA exception raised!" << endlog();
00315 log() << CORBA_EXCEPTION_INFO(e) << endlog();
00316 }
00317 }
00318
00322 class OrbRunner
00323 : public Activity
00324 {
00325 public:
00326 OrbRunner()
00327 : Activity(RTT::os::LowestPriority)
00328 {}
00329 void loop()
00330 {
00331 Logger::In in("OrbRunner");
00332 TaskContextServer::RunOrb();
00333 }
00334
00335 bool breakLoop()
00336 {
00337 return true;
00338 }
00339
00340 void finalize()
00341 {
00342 Logger::In in("OrbRunner");
00343 log(Info) <<"Safely stopped."<<endlog();
00344 }
00345 };
00346
00347 void TaskContextServer::ThreadOrb()
00348 {
00349 Logger::In in("ThreadOrb");
00350 if ( CORBA::is_nil(orb) ) {
00351 log(Error) << "ThreadOrb...failed! Orb is nil." << endlog();
00352 return;
00353 }
00354 if (orbrunner != 0) {
00355 log(Error) <<"Orb already running in a thread."<<endlog();
00356 } else {
00357 log(Info) <<"Starting Orb in a thread."<<endlog();
00358 orbrunner = new OrbRunner();
00359
00360 orbrunner->start();
00361 }
00362 }
00363
00364 void TaskContextServer::DestroyOrb()
00365 {
00366 Logger::In in("DestroyOrb");
00367 if ( CORBA::is_nil(orb) ) {
00368 log(Error) << "DestroyOrb...failed! Orb is nil." << endlog();
00369 return;
00370 }
00371
00372 if (orbrunner) {
00373 orbrunner->stop();
00374 delete orbrunner;
00375 orbrunner = 0;
00376 }
00377
00378 try {
00379
00380
00381 CleanupServers();
00382 orb->destroy();
00383 rootPOA = 0;
00384 orb = 0;
00385 log(Info) <<"Orb destroyed."<<endlog();
00386 }
00387 catch (CORBA::Exception &e) {
00388 log(Error) << "Orb Destroy : CORBA exception raised!" << endlog();
00389 log() << CORBA_EXCEPTION_INFO(e) << endlog();
00390 }
00391
00392 }
00393
00394 TaskContextServer* TaskContextServer::Create(TaskContext* tc, bool use_naming, bool require_name_service) {
00395 if ( CORBA::is_nil(orb) )
00396 return 0;
00397
00398 if ( servers.count(tc) ) {
00399 log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog();
00400 return servers.find(tc)->second;
00401 }
00402
00403
00404 log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog();
00405 try {
00406 TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service);
00407 return cts;
00408 }
00409 catch( IllegalServer& is ) {
00410 cerr << is.what() << endl;
00411 }
00412 return 0;
00413 }
00414
00415 CTaskContext_ptr TaskContextServer::CreateServer(TaskContext* tc, bool use_naming, bool require_name_service) {
00416 if ( CORBA::is_nil(orb) )
00417 return CTaskContext::_nil();
00418
00419 if ( servers.count(tc) ) {
00420 log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog();
00421 return CTaskContext::_duplicate( servers.find(tc)->second->server() );
00422 }
00423
00424 for (TaskContextProxy::PMap::iterator it = TaskContextProxy::proxies.begin(); it != TaskContextProxy::proxies.end(); ++it)
00425 if ( (it->first) == tc ) {
00426 log(Debug) << "Returning server of Proxy for "<<tc->getName()<<endlog();
00427 return CTaskContext::_duplicate(it->second);
00428 }
00429
00430
00431 log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog();
00432 try {
00433 TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service);
00434 return CTaskContext::_duplicate( cts->server() );
00435 }
00436 catch( IllegalServer& is ) {
00437 cerr << is.what() << endl;
00438 }
00439 return CTaskContext::_nil();
00440 }
00441
00442
00443 corba::CTaskContext_ptr TaskContextServer::server() const
00444 {
00445
00446 return mtask.in();
00447 }
00448
00449 std::string TaskContextServer::getIOR(TaskContext* tc)
00450 {
00451 IorMap::const_iterator it = iors.find(tc);
00452 if (it != iors.end())
00453 return it->second;
00454
00455 return std::string("");
00456 }
00457
00458 }}