Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "TaskContextServer.hpp"
00041 #include "TaskContextProxy.hpp"
00042 #include "corba.h"
00043 #ifdef CORBA_IS_TAO
00044 #include "TaskContextS.h"
00045 #include <orbsvcs/CosNamingC.h>
00046
00047 #include <ace/SString.h>
00048 #include "tao/TimeBaseC.h"
00049 #include "tao/Messaging/Messaging.h"
00050 #include "tao/Messaging/Messaging_RT_PolicyC.h"
00051 #else
00052 #include <omniORB4/Naming.hh>
00053 #endif
00054 #include "TaskContextC.h"
00055 #include "TaskContextI.h"
00056 #include "DataFlowI.h"
00057 #include "POAUtility.h"
00058 #include <iostream>
00059 #include <fstream>
00060
00061 #include "../../os/threads.hpp"
00062 #include "../../Activity.hpp"
00063
00064 namespace RTT
00065 {namespace corba
00066 {
00067 using namespace std;
00068
00069 std::map<TaskContext*, TaskContextServer*> TaskContextServer::servers;
00070
00071 base::ActivityInterface* TaskContextServer::orbrunner = 0;
00072
00073 bool TaskContextServer::is_shutdown = false;
00074
00075 std::map<TaskContext*, std::string> TaskContextServer::iors;
00076
00077 TaskContextServer::~TaskContextServer()
00078 {
00079 Logger::In in("~TaskContextServer()");
00080 servers.erase(mtaskcontext);
00081
00082
00083 iors.erase(mtaskcontext);
00084
00085 PortableServer::ObjectId_var oid = mpoa->servant_to_id(mtask_i.in());
00086 mpoa->deactivate_object(oid);
00087
00088 if (muse_naming) {
00089 try {
00090 CORBA::Object_var rootObj = orb->resolve_initial_references("NameService");
00091 CosNaming::NamingContext_var rootNC = CosNaming::NamingContext::_narrow(rootObj.in());
00092
00093 if (CORBA::is_nil( rootNC.in() ) ) {
00094 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' could not find CORBA Naming Service."<<endlog();
00095 } else {
00096
00097 CosNaming::Name name;
00098 name.length(2);
00099 name[0].id = CORBA::string_dup("TaskContexts");
00100 name[1].id = CORBA::string_dup( mtaskcontext->getName().c_str() );
00101 try {
00102 rootNC->unbind(name);
00103 log(Info) << "Successfully removed CTaskContext '"<<mtaskcontext->getName()<<"' from CORBA Naming Service."<<endlog();
00104 }
00105 catch( CosNaming::NamingContext::NotFound ) {
00106 log(Info) << "CTaskContext '"<< mtaskcontext->getName() << "' task was already unbound."<<endlog();
00107 }
00108 catch( ... ) {
00109 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed."<<endlog();
00110 }
00111 }
00112 } catch (...) {
00113 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed from CORBA Naming Service."<<endlog();
00114 }
00115 }
00116 }
00117
00118
00119
00120
00121 TaskContextServer::TaskContextServer(TaskContext* taskc, bool use_naming, bool require_name_service)
00122 : mtaskcontext(taskc), muse_naming(use_naming)
00123 {
00124 Logger::In in("TaskContextServer()");
00125 servers[taskc] = this;
00126 try {
00127
00128
00129 CORBA::Object_var poa_object =
00130 orb->resolve_initial_references ("RootPOA");
00131 mpoa = PortableServer::POA::_narrow(poa_object);
00132 PortableServer::POAManager_var poa_manager =
00133 mpoa->the_POAManager ();
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147 RTT_corba_CTaskContext_i* serv;
00148 mtask_i = serv = new RTT_corba_CTaskContext_i( taskc, mpoa );
00149 mtask = serv->activate_this();
00150
00151
00152 CORBA::String_var ior = orb->object_to_string( mtask.in() );
00153 iors[taskc] = std::string( ior.in() );
00154
00155 if ( use_naming ) {
00156 CORBA::Object_var rootObj;
00157 CosNaming::NamingContext_var rootNC;
00158 try {
00159 rootObj = orb->resolve_initial_references("NameService");
00160 rootNC = CosNaming::NamingContext::_narrow(rootObj);
00161 } catch (...) {}
00162
00163 if (CORBA::is_nil( rootNC ) ) {
00164 std::string err("CTaskContext '" + taskc->getName() + "' could not find CORBA Naming Service.");
00165 if (require_name_service) {
00166 servers.erase(taskc);
00167 log(Error) << err << endlog();
00168 servers.erase(taskc);
00169 throw IllegalServer(err);
00170 }
00171 else
00172 {
00173 log(Warning) << err << endlog();
00174 log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog();
00175
00176
00177 CORBA::String_var ior = orb->object_to_string( mtask.in() );
00178 std::cerr << ior.in() <<std::endl;
00179 {
00180
00181 std::string iorname( taskc->getName());
00182 iorname += ".ior";
00183 std::ofstream file_ior( iorname.c_str() );
00184 file_ior << ior.in() <<std::endl;
00185 }
00186 return;
00187 }
00188 }
00189 log(Info) << "CTaskContext '"<< taskc->getName() << "' found CORBA Naming Service."<<endlog();
00190
00191 CosNaming::Name name;
00192 name.length(1);
00193 name[0].id = CORBA::string_dup("TaskContexts");
00194 CosNaming::NamingContext_var controlNC;
00195 try {
00196 controlNC = rootNC->bind_new_context(name);
00197 }
00198 catch( CosNaming::NamingContext::AlreadyBound&) {
00199 log(Debug) << "NamingContext 'TaskContexts' already bound to CORBA Naming Service."<<endlog();
00200
00201 }
00202
00203 name.length(2);
00204 name[1].id = CORBA::string_dup( taskc->getName().c_str() );
00205 try {
00206 rootNC->bind(name, mtask );
00207 log(Info) << "Successfully added CTaskContext '"<<taskc->getName()<<"' to CORBA Naming Service."<<endlog();
00208 }
00209 catch( CosNaming::NamingContext::AlreadyBound&) {
00210 log(Warning) << "CTaskContext '"<< taskc->getName() << "' already bound to CORBA Naming Service."<<endlog();
00211 log() <<"Trying to rebind...";
00212 try {
00213 rootNC->rebind(name, mtask);
00214 } catch( ... ) {
00215 log() << " failed!"<<endlog();
00216 return;
00217 }
00218 log() << " done. New CTaskContext bound to Naming Service."<<endlog();
00219 }
00220 }
00221 else {
00222 log(Info) <<"CTaskContext '"<< taskc->getName() << "' is not using the CORBA Naming Service."<<endlog();
00223 log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog();
00224
00225
00226 CORBA::String_var ior = orb->object_to_string( mtask.in() );
00227 std::cerr << ior.in() <<std::endl;
00228 {
00229
00230 std::string iorname( taskc->getName());
00231 iorname += ".ior";
00232 std::ofstream file_ior( iorname.c_str() );
00233 file_ior << ior.in() <<std::endl;
00234 }
00235 return;
00236 }
00237 }
00238 catch (CORBA::Exception &e) {
00239 log(Error) << "CORBA exception raised!" << endlog();
00240 log() << CORBA_EXCEPTION_INFO(e) << endlog();
00241 }
00242
00243 }
00244
00245 void TaskContextServer::CleanupServers() {
00246 if ( !CORBA::is_nil(orb) && !is_shutdown) {
00247 log(Info) << "Cleaning up TaskContextServers..."<<endlog();
00248 while ( !servers.empty() ){
00249 delete servers.begin()->second;
00250
00251 }
00252 CDataFlowInterface_i::clearServants();
00253 log() << "Cleanup done."<<endlog();
00254 }
00255 }
00256
00257 void TaskContextServer::CleanupServer(TaskContext* c) {
00258 if ( !CORBA::is_nil(orb) ) {
00259 ServerMap::iterator it = servers.find(c);
00260 if ( it != servers.end() ){
00261 log(Info) << "Cleaning up TaskContextServer for "<< c->getName()<<endlog();
00262 CDataFlowInterface_i::deregisterServant(c->provides().get());
00263 delete it->second;
00264
00265 }
00266 }
00267 }
00268
00269 void TaskContextServer::ShutdownOrb(bool wait_for_completion)
00270 {
00271 Logger::In in("ShutdownOrb");
00272 DoShutdownOrb(wait_for_completion);
00273 }
00274
00275 void TaskContextServer::DoShutdownOrb(bool wait_for_completion)
00276 {
00277 if (is_shutdown) {
00278 log(Info) << "Orb already down..."<<endlog();
00279 return;
00280 }
00281 if ( CORBA::is_nil(orb) ) {
00282 log(Error) << "Orb Shutdown...failed! Orb is nil." << endlog();
00283 return;
00284 }
00285
00286 try {
00287 CleanupServers();
00288 log(Info) << "Orb Shutdown...";
00289 is_shutdown = true;
00290 if (wait_for_completion)
00291 log(Info)<<"waiting..."<<endlog();
00292 orb->shutdown( wait_for_completion );
00293 log(Info) << "done." << endlog();
00294 }
00295 catch (CORBA::Exception &e) {
00296 log(Error) << "Orb Shutdown...failed! CORBA exception raised." << endlog();
00297 log() << CORBA_EXCEPTION_INFO(e) << endlog();
00298 return;
00299 }
00300 }
00301
00302
00303 void TaskContextServer::RunOrb()
00304 {
00305 if ( CORBA::is_nil(orb) ) {
00306 log(Error) << "RunOrb...failed! Orb is nil." << endlog();
00307 return;
00308 }
00309 try {
00310 log(Info) <<"Entering orb->run()."<<endlog();
00311 orb->run();
00312 log(Info) <<"Breaking out of orb->run()."<<endlog();
00313 }
00314 catch (CORBA::Exception &e) {
00315 log(Error) << "Orb Run : CORBA exception raised!" << endlog();
00316 log() << CORBA_EXCEPTION_INFO(e) << endlog();
00317 }
00318 }
00319
00323 class OrbRunner
00324 : public Activity
00325 {
00326 public:
00327 OrbRunner(int scheduler, int priority, unsigned cpu_affinity)
00328 : Activity(scheduler, priority, cpu_affinity)
00329 {}
00330 void loop()
00331 {
00332 Logger::In in("OrbRunner");
00333 TaskContextServer::RunOrb();
00334 }
00335
00336 bool breakLoop()
00337 {
00338 return true;
00339 }
00340
00341 void finalize()
00342 {
00343 Logger::In in("OrbRunner");
00344 log(Info) <<"Safely stopped."<<endlog();
00345 }
00346 };
00347
00348 void TaskContextServer::ThreadOrb() { return ThreadOrb(ORO_SCHED_RT); }
00349 void TaskContextServer::ThreadOrb(int scheduler, int priority, unsigned cpu_affinity)
00350 {
00351 Logger::In in("ThreadOrb");
00352 if ( CORBA::is_nil(orb) ) {
00353 log(Error) << "ThreadOrb...failed! Orb is nil." << endlog();
00354 return;
00355 }
00356 if (orbrunner != 0) {
00357 log(Error) <<"Orb already running in a thread."<<endlog();
00358 } else {
00359 log(Info) <<"Starting Orb in a thread."<<endlog();
00360 orbrunner = new OrbRunner(scheduler, priority, cpu_affinity);
00361 orbrunner->start();
00362 }
00363 }
00364
00365 void TaskContextServer::DestroyOrb()
00366 {
00367 Logger::In in("DestroyOrb");
00368 if ( CORBA::is_nil(orb) ) {
00369 log(Error) << "DestroyOrb...failed! Orb is nil." << endlog();
00370 return;
00371 }
00372
00373 if (orbrunner) {
00374 orbrunner->stop();
00375 delete orbrunner;
00376 orbrunner = 0;
00377 }
00378
00379 try {
00380
00381
00382 CleanupServers();
00383 orb->destroy();
00384 rootPOA = 0;
00385 orb = 0;
00386 log(Info) <<"Orb destroyed."<<endlog();
00387 }
00388 catch (CORBA::Exception &e) {
00389 log(Error) << "Orb Destroy : CORBA exception raised!" << endlog();
00390 log() << CORBA_EXCEPTION_INFO(e) << endlog();
00391 }
00392
00393 }
00394
00395 TaskContextServer* TaskContextServer::Create(TaskContext* tc, bool use_naming, bool require_name_service) {
00396 if ( CORBA::is_nil(orb) )
00397 return 0;
00398
00399 if ( servers.count(tc) ) {
00400 log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog();
00401 return servers.find(tc)->second;
00402 }
00403
00404
00405 log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog();
00406 try {
00407 TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service);
00408 return cts;
00409 }
00410 catch( IllegalServer& is ) {
00411 cerr << is.what() << endl;
00412 }
00413 return 0;
00414 }
00415
00416 CTaskContext_ptr TaskContextServer::CreateServer(TaskContext* tc, bool use_naming, bool require_name_service) {
00417 if ( CORBA::is_nil(orb) )
00418 return CTaskContext::_nil();
00419
00420 if ( servers.count(tc) ) {
00421 log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog();
00422 return CTaskContext::_duplicate( servers.find(tc)->second->server() );
00423 }
00424
00425 for (TaskContextProxy::PMap::iterator it = TaskContextProxy::proxies.begin(); it != TaskContextProxy::proxies.end(); ++it)
00426 if ( (it->first) == tc ) {
00427 log(Debug) << "Returning server of Proxy for "<<tc->getName()<<endlog();
00428 return CTaskContext::_duplicate(it->second);
00429 }
00430
00431
00432 log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog();
00433 try {
00434 TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service);
00435 return CTaskContext::_duplicate( cts->server() );
00436 }
00437 catch( IllegalServer& is ) {
00438 cerr << is.what() << endl;
00439 }
00440 return CTaskContext::_nil();
00441 }
00442
00443
00444 corba::CTaskContext_ptr TaskContextServer::server() const
00445 {
00446
00447 return mtask.in();
00448 }
00449
00450 std::string TaskContextServer::getIOR(TaskContext* tc)
00451 {
00452 IorMap::const_iterator it = iors.find(tc);
00453 if (it != iors.end())
00454 return it->second;
00455
00456 return std::string("");
00457 }
00458
00459 }}