$search
00001 /*************************************************************************** 00002 tag: Peter Soetens Wed Jan 18 14:09:49 CET 2006 TaskContextServer.cxx 00003 00004 TaskContextServer.cxx - description 00005 ------------------- 00006 begin : Wed January 18 2006 00007 copyright : (C) 2006 Peter Soetens 00008 email : peter.soetens@fmtc.be 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 00040 #include "TaskContextServer.hpp" 00041 #include "TaskContextProxy.hpp" 00042 #include "corba.h" 00043 #ifdef CORBA_IS_TAO 00044 #include "TaskContextS.h" 00045 #include <orbsvcs/CosNamingC.h> 00046 // ACE Specific, for printing exceptions. 00047 #include <ace/SString.h> 00048 #include "tao/TimeBaseC.h" 00049 #include "tao/Messaging/Messaging.h" 00050 #include "tao/Messaging/Messaging_RT_PolicyC.h" 00051 #else 00052 #include <omniORB4/Naming.hh> 00053 #endif 00054 #include "TaskContextC.h" 00055 #include "TaskContextI.h" 00056 #include "DataFlowI.h" 00057 #include "POAUtility.h" 00058 #include <iostream> 00059 #include <fstream> 00060 00061 #include "../../os/threads.hpp" 00062 #include "../../Activity.hpp" 00063 00064 namespace RTT 00065 {namespace corba 00066 { 00067 using namespace std; 00068 00069 std::map<TaskContext*, TaskContextServer*> TaskContextServer::servers; 00070 00071 base::ActivityInterface* TaskContextServer::orbrunner = 0; 00072 00073 bool TaskContextServer::is_shutdown = false; 00074 00075 std::map<TaskContext*, std::string> TaskContextServer::iors; 00076 00077 TaskContextServer::~TaskContextServer() 00078 { 00079 Logger::In in("~TaskContextServer()"); 00080 servers.erase(mtaskcontext); 00081 00082 // Remove taskcontext ior reference 00083 iors.erase(mtaskcontext); 00084 00085 PortableServer::ObjectId_var oid = mpoa->servant_to_id(mtask_i.in()); 00086 mpoa->deactivate_object(oid); 00087 00088 if (muse_naming) { 00089 try { 00090 CORBA::Object_var rootObj = orb->resolve_initial_references("NameService"); 00091 CosNaming::NamingContext_var rootNC = CosNaming::NamingContext::_narrow(rootObj.in()); 00092 00093 if (CORBA::is_nil( rootNC.in() ) ) { 00094 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' could not find CORBA Naming Service."<<endlog(); 00095 } else { 00096 // Nameserver found... 00097 CosNaming::Name name; 00098 name.length(2); 00099 name[0].id = CORBA::string_dup("TaskContexts"); 00100 name[1].id = CORBA::string_dup( mtaskcontext->getName().c_str() ); 00101 try { 00102 rootNC->unbind(name); 00103 log(Info) << "Successfully removed CTaskContext '"<<mtaskcontext->getName()<<"' from CORBA Naming Service."<<endlog(); 00104 } 00105 catch( CosNaming::NamingContext::NotFound ) { 00106 log(Info) << "CTaskContext '"<< mtaskcontext->getName() << "' task was already unbound."<<endlog(); 00107 } 00108 catch( ... ) { 00109 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed."<<endlog(); 00110 } 00111 } 00112 } catch (...) { 00113 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed from CORBA Naming Service."<<endlog(); 00114 } 00115 } 00116 } 00117 00118 00119 00120 00121 TaskContextServer::TaskContextServer(TaskContext* taskc, bool use_naming, bool require_name_service) 00122 : mtaskcontext(taskc), muse_naming(use_naming) 00123 { 00124 Logger::In in("TaskContextServer()"); 00125 servers[taskc] = this; 00126 try { 00127 // Each server has its own POA. 00128 // The server's objects have their own poa as well. 00129 CORBA::Object_var poa_object = 00130 orb->resolve_initial_references ("RootPOA"); 00131 mpoa = PortableServer::POA::_narrow(poa_object); 00132 PortableServer::POAManager_var poa_manager = 00133 mpoa->the_POAManager (); 00134 00135 //poa = POAUtility::create_basic_POA( poa, poa_manager, taskc->getName().c_str(), 0, 1); 00136 // poa_manager->activate (); 00137 00138 // TODO : Use a better suited POA than create_basic_POA, use the 'session' or so type 00139 // But watch out: we need implicit activation, our you will get exceptions upon ->_this() 00140 // The POA for the Server's objects: 00141 // PortableServer::POA_var objpoa = POAUtility::create_basic_POA(poa, 00142 // poa_manager, 00143 // std::string(taskc->getName() + "OBJPOA").c_str(), 00144 // 0, 0); // Not persistent, allow implicit. 00145 00146 // The servant : TODO : cleanup servant in destructor ! 00147 RTT_corba_CTaskContext_i* serv; 00148 mtask_i = serv = new RTT_corba_CTaskContext_i( taskc, mpoa ); 00149 mtask = serv->activate_this(); 00150 00151 // Store reference to iors 00152 CORBA::String_var ior = orb->object_to_string( mtask.in() ); 00153 iors[taskc] = std::string( ior.in() ); 00154 00155 if ( use_naming ) { 00156 CORBA::Object_var rootObj; 00157 CosNaming::NamingContext_var rootNC; 00158 try { 00159 rootObj = orb->resolve_initial_references("NameService"); 00160 rootNC = CosNaming::NamingContext::_narrow(rootObj); 00161 } catch (...) {} 00162 00163 if (CORBA::is_nil( rootNC ) ) { 00164 std::string err("CTaskContext '" + taskc->getName() + "' could not find CORBA Naming Service."); 00165 if (require_name_service) { 00166 servers.erase(taskc); 00167 log(Error) << err << endlog(); 00168 servers.erase(taskc); 00169 throw IllegalServer(err); 00170 } 00171 else 00172 { 00173 log(Warning) << err << endlog(); 00174 log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog(); 00175 00176 // this part only publishes the IOR to a file. 00177 CORBA::String_var ior = orb->object_to_string( mtask.in() ); 00178 std::cerr << ior.in() <<std::endl; 00179 { 00180 // write to a file as well. 00181 std::string iorname( taskc->getName()); 00182 iorname += ".ior"; 00183 std::ofstream file_ior( iorname.c_str() ); 00184 file_ior << ior.in() <<std::endl; 00185 } 00186 return; 00187 } 00188 } 00189 log(Info) << "CTaskContext '"<< taskc->getName() << "' found CORBA Naming Service."<<endlog(); 00190 // Nameserver found... 00191 CosNaming::Name name; 00192 name.length(1); 00193 name[0].id = CORBA::string_dup("TaskContexts"); 00194 CosNaming::NamingContext_var controlNC; 00195 try { 00196 controlNC = rootNC->bind_new_context(name); 00197 } 00198 catch( CosNaming::NamingContext::AlreadyBound&) { 00199 log(Debug) << "NamingContext 'TaskContexts' already bound to CORBA Naming Service."<<endlog(); 00200 // NOP. 00201 } 00202 00203 name.length(2); 00204 name[1].id = CORBA::string_dup( taskc->getName().c_str() ); 00205 try { 00206 rootNC->bind(name, mtask ); 00207 log(Info) << "Successfully added CTaskContext '"<<taskc->getName()<<"' to CORBA Naming Service."<<endlog(); 00208 } 00209 catch( CosNaming::NamingContext::AlreadyBound&) { 00210 log(Warning) << "CTaskContext '"<< taskc->getName() << "' already bound to CORBA Naming Service."<<endlog(); 00211 log() <<"Trying to rebind..."; 00212 try { 00213 rootNC->rebind(name, mtask); 00214 } catch( ... ) { 00215 log() << " failed!"<<endlog(); 00216 return; 00217 } 00218 log() << " done. New CTaskContext bound to Naming Service."<<endlog(); 00219 } 00220 } // use_naming 00221 else { 00222 log(Info) <<"CTaskContext '"<< taskc->getName() << "' is not using the CORBA Naming Service."<<endlog(); 00223 log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog(); 00224 00225 // this part only publishes the IOR to a file. 00226 CORBA::String_var ior = orb->object_to_string( mtask.in() ); 00227 std::cerr << ior.in() <<std::endl; 00228 { 00229 // write to a file as well. 00230 std::string iorname( taskc->getName()); 00231 iorname += ".ior"; 00232 std::ofstream file_ior( iorname.c_str() ); 00233 file_ior << ior.in() <<std::endl; 00234 } 00235 return; 00236 } 00237 } 00238 catch (CORBA::Exception &e) { 00239 log(Error) << "CORBA exception raised!" << endlog(); 00240 log() << CORBA_EXCEPTION_INFO(e) << endlog(); 00241 } 00242 00243 } 00244 00245 void TaskContextServer::CleanupServers() { 00246 if ( !CORBA::is_nil(orb) && !is_shutdown) { 00247 log(Info) << "Cleaning up TaskContextServers..."<<endlog(); 00248 while ( !servers.empty() ){ 00249 delete servers.begin()->second; 00250 // note: destructor will self-erase from map ! 00251 } 00252 CDataFlowInterface_i::clearServants(); 00253 log() << "Cleanup done."<<endlog(); 00254 } 00255 } 00256 00257 void TaskContextServer::CleanupServer(TaskContext* c) { 00258 if ( !CORBA::is_nil(orb) ) { 00259 ServerMap::iterator it = servers.find(c); 00260 if ( it != servers.end() ){ 00261 log(Info) << "Cleaning up TaskContextServer for "<< c->getName()<<endlog(); 00262 delete it->second; // destructor will do the rest. 00263 // note: destructor will self-erase from map ! 00264 } 00265 } 00266 } 00267 00268 void TaskContextServer::ShutdownOrb(bool wait_for_completion) 00269 { 00270 Logger::In in("ShutdownOrb"); 00271 DoShutdownOrb(wait_for_completion); 00272 } 00273 00274 void TaskContextServer::DoShutdownOrb(bool wait_for_completion) 00275 { 00276 if (is_shutdown) { 00277 log(Info) << "Orb already down..."<<endlog(); 00278 return; 00279 } 00280 if ( CORBA::is_nil(orb) ) { 00281 log(Error) << "Orb Shutdown...failed! Orb is nil." << endlog(); 00282 return; 00283 } 00284 00285 try { 00286 CleanupServers(); // can't do this after an orb->shutdown(). 00287 log(Info) << "Orb Shutdown..."; 00288 is_shutdown = true; 00289 if (wait_for_completion) 00290 log(Info)<<"waiting..."<<endlog(); 00291 orb->shutdown( wait_for_completion ); 00292 log(Info) << "done." << endlog(); 00293 } 00294 catch (CORBA::Exception &e) { 00295 log(Error) << "Orb Shutdown...failed! CORBA exception raised." << endlog(); 00296 log() << CORBA_EXCEPTION_INFO(e) << endlog(); 00297 return; 00298 } 00299 } 00300 00301 00302 void TaskContextServer::RunOrb() 00303 { 00304 if ( CORBA::is_nil(orb) ) { 00305 log(Error) << "RunOrb...failed! Orb is nil." << endlog(); 00306 return; 00307 } 00308 try { 00309 log(Info) <<"Entering orb->run()."<<endlog(); 00310 orb->run(); 00311 log(Info) <<"Breaking out of orb->run()."<<endlog(); 00312 } 00313 catch (CORBA::Exception &e) { 00314 log(Error) << "Orb Run : CORBA exception raised!" << endlog(); 00315 log() << CORBA_EXCEPTION_INFO(e) << endlog(); 00316 } 00317 } 00318 00322 class OrbRunner 00323 : public Activity 00324 { 00325 public: 00326 OrbRunner() 00327 : Activity(RTT::os::LowestPriority) 00328 {} 00329 void loop() 00330 { 00331 Logger::In in("OrbRunner"); 00332 TaskContextServer::RunOrb(); 00333 } 00334 00335 bool breakLoop() 00336 { 00337 return true; 00338 } 00339 00340 void finalize() 00341 { 00342 Logger::In in("OrbRunner"); 00343 log(Info) <<"Safely stopped."<<endlog(); 00344 } 00345 }; 00346 00347 void TaskContextServer::ThreadOrb() 00348 { 00349 Logger::In in("ThreadOrb"); 00350 if ( CORBA::is_nil(orb) ) { 00351 log(Error) << "ThreadOrb...failed! Orb is nil." << endlog(); 00352 return; 00353 } 00354 if (orbrunner != 0) { 00355 log(Error) <<"Orb already running in a thread."<<endlog(); 00356 } else { 00357 log(Info) <<"Starting Orb in a thread."<<endlog(); 00358 orbrunner = new OrbRunner(); 00359 00360 orbrunner->start(); 00361 } 00362 } 00363 00364 void TaskContextServer::DestroyOrb() 00365 { 00366 Logger::In in("DestroyOrb"); 00367 if ( CORBA::is_nil(orb) ) { 00368 log(Error) << "DestroyOrb...failed! Orb is nil." << endlog(); 00369 return; 00370 } 00371 00372 if (orbrunner) { 00373 orbrunner->stop(); 00374 delete orbrunner; 00375 orbrunner = 0; 00376 } 00377 00378 try { 00379 // Destroy the POA, waiting until the destruction terminates 00380 //poa->destroy (1, 1); 00381 CleanupServers(); 00382 orb->destroy(); 00383 rootPOA = 0; 00384 orb = 0; 00385 log(Info) <<"Orb destroyed."<<endlog(); 00386 } 00387 catch (CORBA::Exception &e) { 00388 log(Error) << "Orb Destroy : CORBA exception raised!" << endlog(); 00389 log() << CORBA_EXCEPTION_INFO(e) << endlog(); 00390 } 00391 00392 } 00393 00394 TaskContextServer* TaskContextServer::Create(TaskContext* tc, bool use_naming, bool require_name_service) { 00395 if ( CORBA::is_nil(orb) ) 00396 return 0; 00397 00398 if ( servers.count(tc) ) { 00399 log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog(); 00400 return servers.find(tc)->second; 00401 } 00402 00403 // create new: 00404 log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog(); 00405 try { 00406 TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service); 00407 return cts; 00408 } 00409 catch( IllegalServer& is ) { 00410 cerr << is.what() << endl; 00411 } 00412 return 0; 00413 } 00414 00415 CTaskContext_ptr TaskContextServer::CreateServer(TaskContext* tc, bool use_naming, bool require_name_service) { 00416 if ( CORBA::is_nil(orb) ) 00417 return CTaskContext::_nil(); 00418 00419 if ( servers.count(tc) ) { 00420 log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog(); 00421 return CTaskContext::_duplicate( servers.find(tc)->second->server() ); 00422 } 00423 00424 for (TaskContextProxy::PMap::iterator it = TaskContextProxy::proxies.begin(); it != TaskContextProxy::proxies.end(); ++it) 00425 if ( (it->first) == tc ) { 00426 log(Debug) << "Returning server of Proxy for "<<tc->getName()<<endlog(); 00427 return CTaskContext::_duplicate(it->second); 00428 } 00429 00430 // create new: 00431 log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog(); 00432 try { 00433 TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service); 00434 return CTaskContext::_duplicate( cts->server() ); 00435 } 00436 catch( IllegalServer& is ) { 00437 cerr << is.what() << endl; 00438 } 00439 return CTaskContext::_nil(); 00440 } 00441 00442 00443 corba::CTaskContext_ptr TaskContextServer::server() const 00444 { 00445 // we're not a factory function, so we don't _duplicate. 00446 return mtask.in(); 00447 } 00448 00449 std::string TaskContextServer::getIOR(TaskContext* tc) 00450 { 00451 IorMap::const_iterator it = iors.find(tc); 00452 if (it != iors.end()) 00453 return it->second; 00454 00455 return std::string(""); 00456 } 00457 00458 }}