CorbaDispatcher.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 CorbaDispatcher.hpp
3 
4  CorbaDispatcher.hpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef ORO_CORBA_DISPATCHER_HPP
40 #define ORO_CORBA_DISPATCHER_HPP
41 
42 #include "../../os/MutexLock.hpp"
43 #include "../../Activity.hpp"
44 #include "../../base/ChannelElementBase.hpp"
45 #include "../../Logger.hpp"
46 #include "../../internal/List.hpp"
47 #include "../../types/GlobalsRepository.hpp"
48 #include "DataFlowI.h"
49 #include "../../DataFlowInterface.hpp"
50 #include "../../TaskContext.hpp"
51 
52 namespace RTT {
53  namespace corba {
58  class CorbaDispatcher : public Activity
59  {
60  typedef std::map<DataFlowInterface*,CorbaDispatcher*> DispatchMap;
61  RTT_CORBA_API static DispatchMap DispatchI;
62 
64  RCList RClist;
65 
66  bool do_exit;
67 
68  RTT_CORBA_API static os::Mutex* mlock;
69 
70  RTT_CORBA_API static int defaultScheduler;
71  RTT_CORBA_API static int defaultPriority;
72  RTT_CORBA_API static int defaultCpuAffinity;
73 
74  CorbaDispatcher( const std::string& name)
75  : Activity(defaultScheduler, defaultPriority, 0.0, 0, name),
76  RClist(20,2),
77  do_exit(false)
78  {}
79 
80  CorbaDispatcher( const std::string& name, int scheduler, int priority, unsigned cpu_affinity)
81  : Activity(scheduler, priority, 0.0, cpu_affinity, 0, name),
82  RClist(20,2),
83  do_exit(false)
84  {}
85 
87  this->stop();
88  }
89 
90  public:
100  if (!mlock)
101  mlock = new os::Mutex();
102  DispatchMap::iterator result = DispatchI.find(iface);
103  if ( result == DispatchI.end() ) {
104  os::MutexLock lock(*mlock);
105  // re-try to find (avoid race):
106  result = DispatchI.find(iface);
107  if ( result != DispatchI.end() )
108  return result->second;
109  // *really* not found, let's create it.
110  std::string name;
111  TaskContext* owner = (iface != 0 ? iface->getOwner() : 0);
112  if ( !owner )
113  name = "Global";
114  else
115  name = owner->getName();
116  name += "Corba";
117 
118  // The properties to create the CorbaDispatcher are retrieved.
119  // When the CorbaDispatcher is created these properties can't be changed anymore,
120  // so they are converted to Constants.
122  // The hard coded default is used if the property isn't set for the Component
123  // that owns the Dispatcher and for the GlobalService.
127 
128  // If the Property is defined for the Component or for the GlobalService,
129  // the temporary Property values is updated.
130  if ( owner ) {
131  scheduler.refresh(owner->getProperty("CorbaDispatcherScheduler")) ||
132  scheduler.refresh(global_repository->getProperty("CorbaDispatcherScheduler"));
133 
134  priority.refresh(owner->getProperty("CorbaDispatcherPriority")) ||
135  priority.refresh(global_repository->getProperty("CorbaDispatcherPriority"));
136 
137  cpu_affinity.refresh(owner->getProperty("CorbaDispatcherCpuAffinity")) ||
138  cpu_affinity.refresh(global_repository->getProperty("CorbaDispatcherCpuAffinity"));
139  } else {
140  scheduler.refresh(global_repository->getProperty("CorbaDispatcherScheduler"));
141  priority.refresh(global_repository->getProperty("CorbaDispatcherPriority"));
142  cpu_affinity.refresh(global_repository->getProperty("CorbaDispatcherCpuAffinity"));
143  }
144 
145  DispatchI[iface] = new CorbaDispatcher( name, scheduler, priority, cpu_affinity );
146  DispatchI[iface]->start();
147  return DispatchI[iface];
148  }
149  return result->second;
150  }
151 
156  static void Release(DataFlowInterface* iface) {
157  DispatchMap::iterator result = DispatchI.find(iface);
158  if ( result != DispatchI.end() ) {
159  os::MutexLock lock(*mlock);
160  delete result->second;
161  DispatchI.erase(result);
162  }
163  if ( DispatchI.empty() )
164  delete mlock;
165  mlock = 0;
166  }
167 
171  static void ReleaseAll() {
172  DispatchMap::iterator result = DispatchI.begin();
173  while ( result != DispatchI.end() ) {
174  delete result->second;
175  DispatchI.erase(result);
176  result = DispatchI.begin();
177  }
178  delete mlock;
179  mlock = 0;
180  }
181 
183  {
184  result = result || (c0 == c1);
185  }
186 
188  bool has_element = false;
189  RClist.apply(boost::bind(&CorbaDispatcher::hasElement, _1, chan, boost::ref(has_element)));
190  if (!has_element)
191  RClist.append( chan );
192  this->trigger();
193  }
194 
196  RClist.erase( chan );
197  }
198 
199  bool initialize() {
200  log(Info) <<"Started " << this->getName() << "." <<endlog();
201  do_exit = false;
202  return true;
203  }
204 
205  void loop() {
206  while ( !RClist.empty() && !do_exit) {
208  CRemoteChannelElement_i* rbase = dynamic_cast<CRemoteChannelElement_i*>(chan.get());
209  if (rbase)
210  rbase->transferSamples();
211  RClist.erase( chan );
212  }
213  }
214 
215  bool breakLoop() {
216  do_exit = true;
217  return true;
218  }
219  };
220  }
221 }
222 #endif
static RTT_CORBA_API os::Mutex * mlock
CorbaDispatcher(const std::string &name)
virtual bool refresh(const base::PropertyBase *other)
Definition: Property.hpp:314
virtual bool stop()
Definition: Activity.cpp:280
CorbaDispatcher(const std::string &name, int scheduler, int priority, unsigned cpu_affinity)
boost::shared_ptr< GlobalsRepository > shared_ptr
void cancelChannel(base::ChannelElementBase::shared_ptr chan)
static CorbaDispatcher * Instance(DataFlowInterface *iface)
static void hasElement(base::ChannelElementBase::shared_ptr c0, base::ChannelElementBase::shared_ptr c1, bool &result)
std::map< DataFlowInterface *, CorbaDispatcher * > DispatchMap
void apply(Function func)
void dispatchChannel(base::ChannelElementBase::shared_ptr chan)
static RTT_CORBA_API int defaultPriority
boost::intrusive_ptr< ChannelElementBase > shared_ptr
An Activity executes a RunnableInterface object in a (periodic) thread.
Definition: Activity.hpp:70
virtual bool trigger()
Definition: Activity.cpp:147
static RTT_CORBA_API int defaultCpuAffinity
An object oriented wrapper around a non recursive mutex.
Definition: Mutex.hpp:92
internal::List< base::ChannelElementBase::shared_ptr > RCList
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
static void Release(DataFlowInterface *iface)
static Logger & log()
Definition: Logger.hpp:350
static Logger::LogFunction endlog()
Definition: Logger.hpp:362
bool append(value_t item)
bool erase(value_t item)
static RTT_CORBA_API DispatchMap DispatchI
virtual const char * getName() const
Definition: Thread.cpp:645
virtual const std::string & getName() const
static RTT_CORBA_API int defaultScheduler
TaskContext * getOwner() const
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51


rtt
Author(s): RTT Developers
autogenerated on Tue Jun 25 2019 19:33:23