TaskContextServer.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Wed Jan 18 14:09:49 CET 2006 TaskContextServer.cxx
3 
4  TaskContextServer.cxx - description
5  -------------------
6  begin : Wed January 18 2006
7  copyright : (C) 2006 Peter Soetens
8  email : peter.soetens@fmtc.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 
40 #include "TaskContextServer.hpp"
41 #include "TaskContextProxy.hpp"
42 #include "corba.h"
43 #ifdef CORBA_IS_TAO
44 #include "TaskContextS.h"
45 #include <orbsvcs/CosNamingC.h>
46 // ACE Specific, for printing exceptions.
47 #include <ace/SString.h>
48 #include "tao/TimeBaseC.h"
49 #include "tao/Messaging/Messaging.h"
50 #include "tao/Messaging/Messaging_RT_PolicyC.h"
51 #else
52 #include <omniORB4/Naming.hh>
53 #endif
54 #include "TaskContextC.h"
55 #include "TaskContextI.h"
56 #include "DataFlowI.h"
57 #include "POAUtility.h"
58 #include <iostream>
59 #include <fstream>
60 
61 #include "../../os/threads.hpp"
62 #include "../../Activity.hpp"
63 #include "../../types/GlobalsRepository.hpp"
64 
65 namespace RTT
66 {namespace corba
67 {
68  using namespace std;
69 
70  std::map<TaskContext*, TaskContextServer*> TaskContextServer::servers;
71 
72  base::ActivityInterface* TaskContextServer::orbrunner = 0;
73 
74  bool TaskContextServer::is_shutdown = false;
75 
76  std::map<TaskContext*, std::string> TaskContextServer::iors;
77 
79  {
80  Logger::In in("~TaskContextServer()");
81  servers.erase(mtaskcontext);
82 
83  // Remove taskcontext ior reference
84  iors.erase(mtaskcontext);
85 
86  PortableServer::ObjectId_var oid = mpoa->servant_to_id(mtask_i.in());
87  mpoa->deactivate_object(oid);
88 
89  if (muse_naming) {
90  try {
91  CORBA::Object_var rootObj = orb->resolve_initial_references("NameService");
92  CosNaming::NamingContext_var rootNC = CosNaming::NamingContext::_narrow(rootObj.in());
93 
94  if (CORBA::is_nil( rootNC.in() ) ) {
95  log(Warning) << "CTaskContext '"<< mregistered_name << "' could not find CORBA Naming Service."<<endlog();
96  } else {
97  // Nameserver found...
98  CosNaming::Name name;
99  name.length(2);
100  name[0].id = CORBA::string_dup("TaskContexts");
101  name[1].id = CORBA::string_dup( mregistered_name.c_str() );
102  try {
103  rootNC->unbind(name);
104  log(Info) << "Successfully removed CTaskContext '"<< mregistered_name <<"' from CORBA Naming Service."<<endlog();
105  }
106  catch( CosNaming::NamingContext::NotFound ) {
107  log(Info) << "CTaskContext '"<< mregistered_name << "' task was already unbound."<<endlog();
108  }
109  catch( ... ) {
110  log(Warning) << "CTaskContext '"<< mregistered_name << "' unbinding failed."<<endlog();
111  }
112  }
113  } catch (...) {
114  log(Warning) << "CTaskContext '"<< mregistered_name << "' unbinding failed from CORBA Naming Service."<<endlog();
115  }
116  }
117  }
118 
119 
120  void TaskContextServer::initTaskContextServer(bool require_name_service)
121  {
122  Logger::In in("TaskContextServer()");
123  servers[mtaskcontext] = this;
124  try {
125  // Each server has its own POA.
126  // The server's objects have their own poa as well.
127  CORBA::Object_var poa_object =
128  orb->resolve_initial_references ("RootPOA");
129  mpoa = PortableServer::POA::_narrow(poa_object);
130  PortableServer::POAManager_var poa_manager =
131  mpoa->the_POAManager ();
132 
133  //poa = POAUtility::create_basic_POA( poa, poa_manager, taskc->getName().c_str(), 0, 1);
134  // poa_manager->activate ();
135 
136  // TODO : Use a better suited POA than create_basic_POA, use the 'session' or so type
137  // But watch out: we need implicit activation, our you will get exceptions upon ->_this()
138  // The POA for the Server's objects:
139  // PortableServer::POA_var objpoa = POAUtility::create_basic_POA(poa,
140  // poa_manager,
141  // std::string(taskc->getName() + "OBJPOA").c_str(),
142  // 0, 0); // Not persistent, allow implicit.
143 
144  // The servant : TODO : cleanup servant in destructor !
146  mtask_i = serv = new RTT_corba_CTaskContext_i( mtaskcontext, mpoa );
147  mtask = serv->activate_this();
148 
149  // Store reference to iors
150  CORBA::String_var ior = orb->object_to_string( mtask.in() );
151  iors[mtaskcontext] = std::string( ior.in() );
152 
153  if ( muse_naming ) {
154  CORBA::Object_var rootObj;
155  CosNaming::NamingContext_var rootNC;
156  try {
157  rootObj = orb->resolve_initial_references("NameService");
158  rootNC = CosNaming::NamingContext::_narrow(rootObj);
159  } catch (...) {}
160 
161  if (CORBA::is_nil( rootNC ) ) {
162  std::string err("CTaskContext '" + mregistered_name + "' could not find CORBA Naming Service.");
163  if (require_name_service) {
164  log(Error) << err << endlog();
165  servers.erase(mtaskcontext);
166  throw IllegalServer(err);
167  }
168  else
169  {
170  log(Warning) << err << endlog();
171 #ifndef ORO_NO_EMIT_CORBA_IOR
172  log() <<"Writing IOR to 'std::cerr' and file '" << mregistered_name <<".ior'"<<endlog();
173 
174  // this part only publishes the IOR to a file.
175  CORBA::String_var ior = orb->object_to_string( mtask.in() );
176  std::cerr << ior.in() <<std::endl;
177  {
178  // write to a file as well.
179  std::string iorname( mregistered_name );
180  iorname += ".ior";
181  std::ofstream file_ior( iorname.c_str() );
182  file_ior << ior.in() <<std::endl;
183  }
184 #endif
185  return;
186  }
187  }
188  log(Info) << "CTaskContext '"<< mregistered_name << "' found CORBA Naming Service."<<endlog();
189  // Nameserver found...
190  CosNaming::Name name;
191  name.length(1);
192  name[0].id = CORBA::string_dup("TaskContexts");
193  CosNaming::NamingContext_var controlNC;
194  try {
195  controlNC = rootNC->bind_new_context(name);
196  }
197  catch( CosNaming::NamingContext::AlreadyBound&) {
198  log(Debug) << "NamingContext 'TaskContexts' already bound to CORBA Naming Service."<<endlog();
199  // NOP.
200  }
201 
202  name.length(2);
203  name[1].id = CORBA::string_dup( mregistered_name.c_str() );
204  try {
205  rootNC->bind(name, mtask );
206  log(Info) << "Successfully added CTaskContext '"<< mregistered_name <<"' to CORBA Naming Service."<<endlog();
207  }
208  catch( CosNaming::NamingContext::AlreadyBound&) {
209  log(Warning) << "CTaskContext '"<< mregistered_name << "' already bound to CORBA Naming Service."<<endlog();
210  log() <<"Trying to rebind...";
211  try {
212  rootNC->rebind(name, mtask);
213  } catch( ... ) {
214  log() << " failed!"<<endlog();
215  return;
216  }
217  log() << " done. New CTaskContext bound to Naming Service."<<endlog();
218  }
219  } // use_naming
220  else {
221  log(Info) <<"CTaskContext '"<< mregistered_name << "' is not using the CORBA Naming Service."<<endlog();
222 #ifndef ORO_NO_EMIT_CORBA_IOR
223  log() <<"Writing IOR to 'std::cerr' and file '" << mregistered_name <<".ior'"<<endlog();
224 
225  // this part only publishes the IOR to a file.
226  CORBA::String_var ior = orb->object_to_string( mtask.in() );
227  std::cerr << ior.in() <<std::endl;
228  {
229  // write to a file as well.
230  std::string iorname( mregistered_name );
231  iorname += ".ior";
232  std::ofstream file_ior( iorname.c_str() );
233  file_ior << ior.in() <<std::endl;
234  }
235 #endif
236  return;
237  }
238  }
239  catch (CORBA::Exception &e) {
240  log(Error) << "CORBA exception raised!" << endlog();
241  log() << CORBA_EXCEPTION_INFO(e) << endlog();
242  }
243 
244  }
245 
246  TaskContextServer::TaskContextServer(TaskContext* taskc, const string& alias, bool use_naming, bool require_name_service)
247  : mtaskcontext(taskc), muse_naming(use_naming), mregistered_name(alias)
248  {
249  this->initTaskContextServer(require_name_service);
250  }
251 
252 
253  TaskContextServer::TaskContextServer(TaskContext* taskc, bool use_naming, bool require_name_service)
254  : mtaskcontext(taskc), muse_naming(use_naming), mregistered_name(taskc->getName())
255  {
256  this->initTaskContextServer(require_name_service);
257  }
258 
260  if ( !CORBA::is_nil(orb) && !is_shutdown) {
261  log(Info) << "Cleaning up TaskContextServers..."<<endlog();
262  while ( !servers.empty() ){
263  delete servers.begin()->second;
264  // note: destructor will self-erase from map !
265  }
267  log() << "Cleanup done."<<endlog();
268  }
269  }
270 
272  if ( !CORBA::is_nil(orb) ) {
273  ServerMap::iterator it = servers.find(c);
274  if ( it != servers.end() ){
275  log(Info) << "Cleaning up TaskContextServer for "<< c->getName()<<endlog();
277  delete it->second; // destructor will do the rest.
278  // note: destructor will self-erase from map !
279  }
280  }
281  }
282 
283  void TaskContextServer::ShutdownOrb(bool wait_for_completion)
284  {
285  Logger::In in("ShutdownOrb");
286  DoShutdownOrb(wait_for_completion);
287  }
288 
289  void TaskContextServer::DoShutdownOrb(bool wait_for_completion)
290  {
291  if (is_shutdown) {
292  log(Info) << "Orb already down..."<<endlog();
293  return;
294  }
295  if ( CORBA::is_nil(orb) ) {
296  log(Error) << "Orb Shutdown...failed! Orb is nil." << endlog();
297  return;
298  }
299 
300  try {
301  CleanupServers(); // can't do this after an orb->shutdown().
302  log(Info) << "Orb Shutdown...";
303  is_shutdown = true;
304  if (wait_for_completion)
305  log(Info)<<"waiting..."<<endlog();
306  orb->shutdown( wait_for_completion );
307  log(Info) << "done." << endlog();
308  }
309  catch (CORBA::Exception &e) {
310  log(Error) << "Orb Shutdown...failed! CORBA exception raised." << endlog();
311  log() << CORBA_EXCEPTION_INFO(e) << endlog();
312  return;
313  }
314  }
315 
316 
318  {
319  if ( CORBA::is_nil(orb) ) {
320  log(Error) << "RunOrb...failed! Orb is nil." << endlog();
321  return;
322  }
323  try {
324  log(Info) <<"Entering orb->run()."<<endlog();
325  orb->run();
326  log(Info) <<"Breaking out of orb->run()."<<endlog();
327  }
328  catch (CORBA::Exception &e) {
329  log(Error) << "Orb Run : CORBA exception raised!" << endlog();
330  log() << CORBA_EXCEPTION_INFO(e) << endlog();
331  }
332  }
333 
337  class OrbRunner
338  : public Activity
339  {
340  public:
341  OrbRunner(int scheduler, int priority, unsigned cpu_affinity)
342  : Activity(scheduler, priority, 0.0, cpu_affinity, 0, "OrbRunner")
343  {}
344  void loop()
345  {
346  Logger::In in("OrbRunner");
348  }
349 
350  bool breakLoop()
351  {
352  return true;
353  }
354 
355  void finalize()
356  {
357  Logger::In in("OrbRunner");
358  log(Info) <<"Safely stopped."<<endlog();
359  }
360  };
361 
364 
367  RTT::Property<int> cpu_affinity =RTT::Property<int>("","",0);
368 
369  // The temporary Property value is updated if the Property is defined for
370  // the GlobalService. The hard coded default is used otherwise.
371  scheduler.refresh(global_repository->getProperty("OrbRunnerScheduler"));
372 
373  priority.refresh(global_repository->getProperty("OrbRunnerPriority"));
374 
375  cpu_affinity.refresh(global_repository->getProperty("OrbRunnerCpuAffinity"));
376 
377  return ThreadOrb(scheduler, priority, cpu_affinity);
378  }
379  void TaskContextServer::ThreadOrb(int scheduler, int priority, unsigned cpu_affinity)
380  {
381  Logger::In in("ThreadOrb");
382  if ( CORBA::is_nil(orb) ) {
383  log(Error) << "ThreadOrb...failed! Orb is nil." << endlog();
384  return;
385  }
386  if (orbrunner != 0) {
387  log(Error) <<"Orb already running in a thread."<<endlog();
388  } else {
389  log(Info) <<"Starting Orb in a thread."<<endlog();
390  orbrunner = new OrbRunner(scheduler, priority, cpu_affinity);
391  orbrunner->start();
392  }
393  }
394 
396  {
397  Logger::In in("DestroyOrb");
398  if ( CORBA::is_nil(orb) ) {
399  log(Error) << "DestroyOrb...failed! Orb is nil." << endlog();
400  return;
401  }
402 
403  if (orbrunner) {
404  orbrunner->stop();
405  delete orbrunner;
406  orbrunner = 0;
407  }
408 
409  try {
410  // Destroy the POA, waiting until the destruction terminates
411  //poa->destroy (1, 1);
412  CleanupServers();
413  orb->destroy();
414  rootPOA = 0;
415  orb = 0;
416  log(Info) <<"Orb destroyed."<<endlog();
417  }
418  catch (CORBA::Exception &e) {
419  log(Error) << "Orb Destroy : CORBA exception raised!" << endlog();
420  log() << CORBA_EXCEPTION_INFO(e) << endlog();
421  }
422 
423  }
424 
425  TaskContextServer* TaskContextServer::Create(TaskContext* tc, bool use_naming, bool require_name_service){
426  return TaskContextServer::Create(tc, tc->getName(), use_naming, require_name_service);
427  }
428 
429  TaskContextServer* TaskContextServer::Create(TaskContext* tc, const std::string& alias, bool use_naming, bool require_name_service) {
430  if ( CORBA::is_nil(orb) )
431  return 0;
432 
433  if ( servers.count(tc) ) {
434  log(Debug) << "Returning existing TaskContextServer for "<< alias <<endlog();
435  return servers.find(tc)->second;
436  }
437 
438  // create new:
439  log(Info) << "Creating new TaskContextServer for "<< alias <<endlog();
440  try {
441  TaskContextServer* cts = new TaskContextServer(tc, alias, use_naming, require_name_service);
442  return cts;
443  }
444  catch( IllegalServer& is ) {
445  cerr << is.what() << endl;
446  }
447  return 0;
448  }
449 
450  CTaskContext_ptr TaskContextServer::CreateServer(TaskContext* tc, bool use_naming, bool require_name_service) {
451  return TaskContextServer::CreateServer(tc, tc->getName(), use_naming, require_name_service);
452  }
453 
454  CTaskContext_ptr TaskContextServer::CreateServer(TaskContext* tc, const std::string& alias, bool use_naming, bool require_name_service) {
455  if ( CORBA::is_nil(orb) )
456  return CTaskContext::_nil();
457 
458  if ( servers.count(tc) ) {
459  log(Debug) << "Returning existing TaskContextServer for "<< alias <<endlog();
460  return CTaskContext::_duplicate( servers.find(tc)->second->server() );
461  }
462 
463  for (TaskContextProxy::PMap::iterator it = TaskContextProxy::proxies.begin(); it != TaskContextProxy::proxies.end(); ++it)
464  if ( (it->first) == tc ) {
465  log(Debug) << "Returning server of Proxy for "<< alias <<endlog();
466  return CTaskContext::_duplicate(it->second);
467  }
468 
469  // create new:
470  log(Info) << "Creating new TaskContextServer for "<< alias <<endlog();
471  try {
472  TaskContextServer* cts = new TaskContextServer(tc, alias, use_naming, require_name_service);
473  return CTaskContext::_duplicate( cts->server() );
474  }
475  catch( IllegalServer& is ) {
476  cerr << is.what() << endl;
477  }
478  return CTaskContext::_nil();
479  }
480 
481 
482  CTaskContext_ptr TaskContextServer::server() const
483  {
484  // we're not a factory function, so we don't _duplicate.
485  return mtask.in();
486  }
487 
489  {
490  IorMap::const_iterator it = iors.find(tc);
491  if (it != iors.end())
492  return it->second;
493 
494  return std::string("");
495  }
496 
497 }}
virtual bool refresh(const base::PropertyBase *other)
Definition: Property.hpp:314
Service::shared_ptr provides()
Definition: mystd.hpp:163
virtual RTT::corba::CTaskContext_ptr activate_this()
Definition: TaskContextI.h:108
#define CORBA_EXCEPTION_INFO(x)
Definition: corba.h:70
boost::shared_ptr< GlobalsRepository > shared_ptr
static CTaskContext_ptr CreateServer(TaskContext *tc, bool use_naming=true, bool require_name_service=false)
static std::string getIOR(TaskContext *tc)
TaskContextServer(TaskContext *taskcontext, bool use_naming, bool require_name_service)
void initTaskContextServer(bool require_name_service)
OrbRunner(int scheduler, int priority, unsigned cpu_affinity)
static void CleanupServer(TaskContext *tc)
const char * what() const
static base::ActivityInterface * orbrunner
static PortableServer::POA_var rootPOA
static TaskContextServer * Create(TaskContext *tc, bool use_naming=true, bool require_name_service=false)
basic_ostreams & endl(basic_ostreams &s)
Definition: rtstreams.cpp:110
An Activity executes a RunnableInterface object in a (periodic) thread.
Definition: Activity.hpp:70
corba::CTaskContext_var mtask
static void deregisterServant(DataFlowInterface *obj)
Definition: DataFlowI.cpp:87
const int LowestPriority
Definition: ecosthreads.cpp:44
#define ORO_SCHED_RT
Definition: ecos/fosi.h:61
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
static void ShutdownOrb(bool wait_for_completion=true)
static Logger & log()
Definition: Logger.hpp:350
static void DoShutdownOrb(bool wait_for_completion=true)
static Logger::LogFunction endlog()
Definition: Logger.hpp:362
CTaskContext_ptr server() const
virtual const std::string & getName() const


rtt
Author(s): RTT Developers
autogenerated on Fri Oct 25 2019 03:59:44