TaskContextServer.cpp
Go to the documentation of this file.
00001 /***************************************************************************
00002   tag: Peter Soetens  Wed Jan 18 14:09:49 CET 2006  TaskContextServer.cxx
00003 
00004                         TaskContextServer.cxx -  description
00005                            -------------------
00006     begin                : Wed January 18 2006
00007     copyright            : (C) 2006 Peter Soetens
00008     email                : peter.soetens@fmtc.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 
00040 #include "TaskContextServer.hpp"
00041 #include "TaskContextProxy.hpp"
00042 #include "corba.h"
00043 #ifdef CORBA_IS_TAO
00044 #include "TaskContextS.h"
00045 #include <orbsvcs/CosNamingC.h>
00046 // ACE Specific, for printing exceptions.
00047 #include <ace/SString.h>
00048 #include "tao/TimeBaseC.h"
00049 #include "tao/Messaging/Messaging.h"
00050 #include "tao/Messaging/Messaging_RT_PolicyC.h"
00051 #else
00052 #include <omniORB4/Naming.hh>
00053 #endif
00054 #include "TaskContextC.h"
00055 #include "TaskContextI.h"
00056 #include "DataFlowI.h"
00057 #include "POAUtility.h"
00058 #include <iostream>
00059 #include <fstream>
00060 
00061 #include "../../os/threads.hpp"
00062 #include "../../Activity.hpp"
00063 
00064 namespace RTT
00065 {namespace corba
00066 {
00067     using namespace std;
00068 
00069     std::map<TaskContext*, TaskContextServer*> TaskContextServer::servers;
00070 
00071     base::ActivityInterface* TaskContextServer::orbrunner = 0;
00072 
00073     bool TaskContextServer::is_shutdown = false;
00074 
00075     std::map<TaskContext*, std::string> TaskContextServer::iors;
00076 
00077   TaskContextServer::~TaskContextServer()
00078   {
00079     Logger::In in("~TaskContextServer()");
00080     servers.erase(mtaskcontext);
00081 
00082     // Remove taskcontext ior reference
00083     iors.erase(mtaskcontext);
00084 
00085     PortableServer::ObjectId_var oid = mpoa->servant_to_id(mtask_i.in());
00086     mpoa->deactivate_object(oid);
00087 
00088     if (muse_naming) {
00089         try {
00090             CORBA::Object_var rootObj = orb->resolve_initial_references("NameService");
00091             CosNaming::NamingContext_var rootNC = CosNaming::NamingContext::_narrow(rootObj.in());
00092 
00093             if (CORBA::is_nil( rootNC.in() ) ) {
00094                 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' could not find CORBA Naming Service."<<endlog();
00095             } else {
00096                 // Nameserver found...
00097                 CosNaming::Name name;
00098                 name.length(2);
00099                 name[0].id = CORBA::string_dup("TaskContexts");
00100                 name[1].id = CORBA::string_dup( mtaskcontext->getName().c_str() );
00101                 try {
00102                     rootNC->unbind(name);
00103                     log(Info) << "Successfully removed CTaskContext '"<<mtaskcontext->getName()<<"' from CORBA Naming Service."<<endlog();
00104                 }
00105                 catch( CosNaming::NamingContext::NotFound ) {
00106                     log(Info) << "CTaskContext '"<< mtaskcontext->getName() << "' task was already unbound."<<endlog();
00107                 }
00108                 catch( ... ) {
00109                     log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed."<<endlog();
00110                 }
00111             }
00112         } catch (...) {
00113             log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed from CORBA Naming Service."<<endlog();
00114         }
00115     }
00116   }
00117 
00118 
00119 
00120 
00121     TaskContextServer::TaskContextServer(TaskContext* taskc, bool use_naming, bool require_name_service)
00122       : mtaskcontext(taskc), muse_naming(use_naming)
00123     {
00124         Logger::In in("TaskContextServer()");
00125         servers[taskc] = this;
00126         try {
00127             // Each server has its own POA.
00128             // The server's objects have their own poa as well.
00129             CORBA::Object_var poa_object =
00130                 orb->resolve_initial_references ("RootPOA");
00131             mpoa = PortableServer::POA::_narrow(poa_object);
00132             PortableServer::POAManager_var poa_manager =
00133                 mpoa->the_POAManager ();
00134 
00135             //poa = POAUtility::create_basic_POA( poa, poa_manager, taskc->getName().c_str(), 0, 1);
00136             //            poa_manager->activate ();
00137 
00138             // TODO : Use a better suited POA than create_basic_POA, use the 'session' or so type
00139             // But watch out: we need implicit activation, our you will get exceptions upon ->_this()
00140             // The POA for the Server's objects:
00141 //             PortableServer::POA_var objpoa = POAUtility::create_basic_POA(poa,
00142 //                                                               poa_manager,
00143 //                                                               std::string(taskc->getName() + "OBJPOA").c_str(),
00144 //                                                               0, 0); // Not persistent, allow implicit.
00145 
00146             // The servant : TODO : cleanup servant in destructor !
00147             RTT_corba_CTaskContext_i* serv;
00148             mtask_i = serv = new RTT_corba_CTaskContext_i( taskc, mpoa );
00149             mtask   = serv->activate_this();
00150 
00151             // Store reference to iors
00152             CORBA::String_var ior = orb->object_to_string( mtask.in() );
00153             iors[taskc] = std::string( ior.in() );
00154 
00155             if ( use_naming ) {
00156                 CORBA::Object_var rootObj;
00157                 CosNaming::NamingContext_var rootNC;
00158                 try {
00159                     rootObj = orb->resolve_initial_references("NameService");
00160                     rootNC = CosNaming::NamingContext::_narrow(rootObj);
00161                 } catch (...) {}
00162 
00163                 if (CORBA::is_nil( rootNC ) ) {
00164                     std::string  err("CTaskContext '" + taskc->getName() + "' could not find CORBA Naming Service.");
00165                     if (require_name_service) {
00166                         servers.erase(taskc);
00167                         log(Error) << err << endlog();
00168                         servers.erase(taskc);
00169                         throw IllegalServer(err);
00170                     }
00171                     else
00172                     {
00173                         log(Warning) << err << endlog();
00174 #ifndef ORO_NO_EMIT_CORBA_IOR
00175                         log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog();
00176 
00177                         // this part only publishes the IOR to a file.
00178                         CORBA::String_var ior = orb->object_to_string( mtask.in() );
00179                         std::cerr << ior.in() <<std::endl;
00180                         {
00181                             // write to a file as well.
00182                             std::string iorname( taskc->getName());
00183                             iorname += ".ior";
00184                             std::ofstream file_ior( iorname.c_str() );
00185                             file_ior << ior.in() <<std::endl;
00186                         }
00187 #endif
00188                         return;
00189                     }
00190                 }
00191                 log(Info) << "CTaskContext '"<< taskc->getName() << "' found CORBA Naming Service."<<endlog();
00192                 // Nameserver found...
00193                 CosNaming::Name name;
00194                 name.length(1);
00195                 name[0].id = CORBA::string_dup("TaskContexts");
00196                 CosNaming::NamingContext_var controlNC;
00197                 try {
00198                     controlNC = rootNC->bind_new_context(name);
00199                 }
00200                 catch( CosNaming::NamingContext::AlreadyBound&) {
00201                     log(Debug) << "NamingContext 'TaskContexts' already bound to CORBA Naming Service."<<endlog();
00202                     // NOP.
00203                 }
00204 
00205                 name.length(2);
00206                 name[1].id = CORBA::string_dup( taskc->getName().c_str() );
00207                 try {
00208                     rootNC->bind(name, mtask );
00209                     log(Info) << "Successfully added CTaskContext '"<<taskc->getName()<<"' to CORBA Naming Service."<<endlog();
00210                 }
00211                 catch( CosNaming::NamingContext::AlreadyBound&) {
00212                     log(Warning) << "CTaskContext '"<< taskc->getName() << "' already bound to CORBA Naming Service."<<endlog();
00213                     log() <<"Trying to rebind...";
00214                     try {
00215                         rootNC->rebind(name, mtask);
00216                     } catch( ... ) {
00217                         log() << " failed!"<<endlog();
00218                         return;
00219                     }
00220                     log() << " done. New CTaskContext bound to Naming Service."<<endlog();
00221                 }
00222             } // use_naming
00223             else {
00224                 log(Info) <<"CTaskContext '"<< taskc->getName() << "' is not using the CORBA Naming Service."<<endlog();
00225 #ifndef ORO_NO_EMIT_CORBA_IOR
00226                 log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog();
00227 
00228                 // this part only publishes the IOR to a file.
00229                 CORBA::String_var ior = orb->object_to_string( mtask.in() );
00230                 std::cerr << ior.in() <<std::endl;
00231                 {
00232                     // write to a file as well.
00233                     std::string iorname( taskc->getName());
00234                     iorname += ".ior";
00235                     std::ofstream file_ior( iorname.c_str() );
00236                     file_ior << ior.in() <<std::endl;
00237                 }
00238 #endif
00239                 return;
00240             }
00241         }
00242         catch (CORBA::Exception &e) {
00243             log(Error) << "CORBA exception raised!" << endlog();
00244             log() << CORBA_EXCEPTION_INFO(e) << endlog();
00245         }
00246 
00247     }
00248 
00249     void TaskContextServer::CleanupServers() {
00250         if ( !CORBA::is_nil(orb) && !is_shutdown) {
00251             log(Info) << "Cleaning up TaskContextServers..."<<endlog();
00252             while ( !servers.empty() ){
00253                 delete servers.begin()->second;
00254                 // note: destructor will self-erase from map !
00255             }
00256             CDataFlowInterface_i::clearServants();
00257             log() << "Cleanup done."<<endlog();
00258         }
00259     }
00260 
00261     void TaskContextServer::CleanupServer(TaskContext* c) {
00262         if ( !CORBA::is_nil(orb) ) {
00263             ServerMap::iterator it = servers.find(c);
00264             if ( it != servers.end() ){
00265                 log(Info) << "Cleaning up TaskContextServer for "<< c->getName()<<endlog();
00266                 CDataFlowInterface_i::deregisterServant(c->provides().get());
00267                 delete it->second; // destructor will do the rest.
00268                 // note: destructor will self-erase from map !
00269             }
00270         }
00271     }
00272 
00273     void TaskContextServer::ShutdownOrb(bool wait_for_completion)
00274     {
00275         Logger::In in("ShutdownOrb");
00276         DoShutdownOrb(wait_for_completion);
00277     }
00278 
00279     void TaskContextServer::DoShutdownOrb(bool wait_for_completion)
00280     {
00281         if (is_shutdown) {
00282             log(Info) << "Orb already down..."<<endlog();
00283             return;
00284         }
00285         if ( CORBA::is_nil(orb) ) {
00286             log(Error) << "Orb Shutdown...failed! Orb is nil." << endlog();
00287             return;
00288         }
00289 
00290         try {
00291             CleanupServers(); // can't do this after an orb->shutdown().
00292             log(Info) << "Orb Shutdown...";
00293             is_shutdown = true;
00294             if (wait_for_completion)
00295                 log(Info)<<"waiting..."<<endlog();
00296             orb->shutdown( wait_for_completion );
00297             log(Info) << "done." << endlog();
00298         }
00299         catch (CORBA::Exception &e) {
00300             log(Error) << "Orb Shutdown...failed! CORBA exception raised." << endlog();
00301             log() << CORBA_EXCEPTION_INFO(e) << endlog();
00302             return;
00303         }
00304     }
00305 
00306 
00307     void TaskContextServer::RunOrb()
00308     {
00309         if ( CORBA::is_nil(orb) ) {
00310             log(Error) << "RunOrb...failed! Orb is nil." << endlog();
00311             return;
00312         }
00313         try {
00314             log(Info) <<"Entering orb->run()."<<endlog();
00315             orb->run();
00316             log(Info) <<"Breaking out of orb->run()."<<endlog();
00317         }
00318         catch (CORBA::Exception &e) {
00319             log(Error) << "Orb Run : CORBA exception raised!" << endlog();
00320             log() << CORBA_EXCEPTION_INFO(e) << endlog();
00321         }
00322     }
00323 
00327     class OrbRunner
00328         : public Activity
00329     {
00330     public:
00331         OrbRunner(int scheduler, int priority, unsigned cpu_affinity)
00332             : Activity(scheduler, priority, cpu_affinity)
00333         {}
00334         void loop()
00335         {
00336             Logger::In in("OrbRunner");
00337             TaskContextServer::RunOrb();
00338         }
00339 
00340         bool breakLoop()
00341         {
00342             return true;
00343         }
00344 
00345         void finalize()
00346         {
00347             Logger::In in("OrbRunner");
00348             log(Info) <<"Safely stopped."<<endlog();
00349         }
00350     };
00351 
00352     void TaskContextServer::ThreadOrb() { return ThreadOrb(ORO_SCHED_RT); }
00353     void TaskContextServer::ThreadOrb(int scheduler, int priority, unsigned cpu_affinity)
00354     {
00355         Logger::In in("ThreadOrb");
00356         if ( CORBA::is_nil(orb) ) {
00357             log(Error) << "ThreadOrb...failed! Orb is nil." << endlog();
00358             return;
00359         }
00360         if (orbrunner != 0) {
00361             log(Error) <<"Orb already running in a thread."<<endlog();
00362         } else {
00363             log(Info) <<"Starting Orb in a thread."<<endlog();
00364             orbrunner = new OrbRunner(scheduler, priority, cpu_affinity);
00365             orbrunner->start();
00366         }
00367     }
00368 
00369     void TaskContextServer::DestroyOrb()
00370     {
00371         Logger::In in("DestroyOrb");
00372         if ( CORBA::is_nil(orb) ) {
00373             log(Error) << "DestroyOrb...failed! Orb is nil." << endlog();
00374             return;
00375         }
00376 
00377         if (orbrunner) {
00378             orbrunner->stop();
00379             delete orbrunner;
00380             orbrunner = 0;
00381         }
00382 
00383         try {
00384             // Destroy the POA, waiting until the destruction terminates
00385             //poa->destroy (1, 1);
00386             CleanupServers();
00387             orb->destroy();
00388             rootPOA = 0;
00389             orb = 0;
00390             log(Info) <<"Orb destroyed."<<endlog();
00391         }
00392         catch (CORBA::Exception &e) {
00393             log(Error) << "Orb Destroy : CORBA exception raised!" << endlog();
00394             log() << CORBA_EXCEPTION_INFO(e) << endlog();
00395         }
00396 
00397     }
00398 
00399     TaskContextServer* TaskContextServer::Create(TaskContext* tc, bool use_naming, bool require_name_service) {
00400         if ( CORBA::is_nil(orb) )
00401             return 0;
00402 
00403         if ( servers.count(tc) ) {
00404             log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog();
00405             return servers.find(tc)->second;
00406         }
00407 
00408         // create new:
00409         log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog();
00410         try {
00411             TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service);
00412             return cts;
00413         }
00414         catch( IllegalServer& is ) {
00415             cerr << is.what() << endl;
00416         }
00417         return 0;
00418     }
00419 
00420     CTaskContext_ptr TaskContextServer::CreateServer(TaskContext* tc, bool use_naming, bool require_name_service) {
00421         if ( CORBA::is_nil(orb) )
00422             return CTaskContext::_nil();
00423 
00424         if ( servers.count(tc) ) {
00425             log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog();
00426             return CTaskContext::_duplicate( servers.find(tc)->second->server() );
00427         }
00428 
00429         for (TaskContextProxy::PMap::iterator it = TaskContextProxy::proxies.begin(); it != TaskContextProxy::proxies.end(); ++it)
00430             if ( (it->first) == tc ) {
00431                 log(Debug) << "Returning server of Proxy for "<<tc->getName()<<endlog();
00432                 return CTaskContext::_duplicate(it->second);
00433             }
00434 
00435         // create new:
00436         log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog();
00437         try {
00438             TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service);
00439             return CTaskContext::_duplicate( cts->server() );
00440         }
00441         catch( IllegalServer& is ) {
00442             cerr << is.what() << endl;
00443         }
00444         return CTaskContext::_nil();
00445     }
00446 
00447 
00448     corba::CTaskContext_ptr TaskContextServer::server() const
00449     {
00450         // we're not a factory function, so we don't _duplicate.
00451         return mtask.in();
00452     }
00453 
00454     std::string TaskContextServer::getIOR(TaskContext* tc)
00455     {
00456         IorMap::const_iterator it = iors.find(tc);
00457         if (it != iors.end())
00458             return it->second;
00459 
00460         return std::string("");
00461     }
00462 
00463 }}


rtt
Author(s): RTT Developers
autogenerated on Sat Jun 8 2019 18:46:32