$search
00001 /*************************************************************************** 00002 tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 CorbaDispatcher.hpp 00003 00004 CorbaDispatcher.hpp - description 00005 ------------------- 00006 begin : Thu October 22 2009 00007 copyright : (C) 2009 Peter Soetens 00008 email : peter@thesourcworks.com 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #ifndef ORO_CORBA_DISPATCHER_HPP 00040 #define ORO_CORBA_DISPATCHER_HPP 00041 00042 #include "../../os/MutexLock.hpp" 00043 #include "../../Activity.hpp" 00044 #include "../../base/ChannelElementBase.hpp" 00045 #include "../../Logger.hpp" 00046 #include "../../internal/List.hpp" 00047 #include "DataFlowI.h" 00048 #include "../../DataFlowInterface.hpp" 00049 #include "../../TaskContext.hpp" 00050 00051 namespace RTT { 00052 namespace corba { 00057 class CorbaDispatcher : public Activity 00058 { 00059 typedef std::map<DataFlowInterface*,CorbaDispatcher*> DispatchMap; 00060 RTT_CORBA_API static DispatchMap DispatchI; 00061 00062 typedef internal::List<base::ChannelElementBase::shared_ptr> RCList; 00063 RCList RClist; 00064 00065 bool do_exit; 00066 00067 RTT_CORBA_API static os::Mutex* mlock; 00068 00069 RTT_CORBA_API static int defaultScheduler; 00070 RTT_CORBA_API static int defaultPriority; 00071 00072 CorbaDispatcher( const std::string& name) 00073 : Activity(defaultScheduler, defaultPriority, 0.0, 0, name), 00074 RClist(20,2), 00075 do_exit(false) 00076 {} 00077 00078 CorbaDispatcher( const std::string& name, int scheduler, int priority) 00079 : Activity(scheduler, priority, 0.0, 0, name), 00080 RClist(20,2), 00081 do_exit(false) 00082 {} 00083 00084 ~CorbaDispatcher() { 00085 this->stop(); 00086 } 00087 00088 public: 00097 static CorbaDispatcher* Instance(DataFlowInterface* iface, int scheduler = defaultScheduler, int priority = defaultPriority) { 00098 if (!mlock) 00099 mlock = new os::Mutex(); 00100 DispatchMap::iterator result = DispatchI.find(iface); 00101 if ( result == DispatchI.end() ) { 00102 os::MutexLock lock(*mlock); 00103 // re-try to find (avoid race): 00104 result = DispatchI.find(iface); 00105 if ( result != DispatchI.end() ) 00106 return result->second; 00107 // *really* not found, let's create it. 00108 std::string name; 00109 if ( iface == 0 || iface->getOwner() == 0) 00110 name = "Global"; 00111 else 00112 name = iface->getOwner()->getName(); 00113 name += ".CorbaDispatch"; 00114 DispatchI[iface] = new CorbaDispatcher( name, scheduler, priority ); 00115 DispatchI[iface]->start(); 00116 return DispatchI[iface]; 00117 } 00118 return result->second; 00119 } 00120 00125 static void Release(DataFlowInterface* iface) { 00126 DispatchMap::iterator result = DispatchI.find(iface); 00127 if ( result != DispatchI.end() ) { 00128 os::MutexLock lock(*mlock); 00129 delete result->second; 00130 DispatchI.erase(result); 00131 } 00132 if ( DispatchI.empty() ) 00133 delete mlock; 00134 mlock = 0; 00135 } 00136 00140 static void ReleaseAll() { 00141 DispatchMap::iterator result = DispatchI.begin(); 00142 while ( result != DispatchI.end() ) { 00143 delete result->second; 00144 DispatchI.erase(result); 00145 result = DispatchI.begin(); 00146 } 00147 delete mlock; 00148 mlock = 0; 00149 } 00150 00151 static void hasElement(base::ChannelElementBase::shared_ptr c0, base::ChannelElementBase::shared_ptr c1, bool& result) 00152 { 00153 result = result || (c0 == c1); 00154 } 00155 00156 void dispatchChannel( base::ChannelElementBase::shared_ptr chan ) { 00157 bool has_element = false; 00158 RClist.apply(boost::bind(&CorbaDispatcher::hasElement, _1, chan, boost::ref(has_element))); 00159 if (!has_element) 00160 RClist.append( chan ); 00161 this->trigger(); 00162 } 00163 00164 void cancelChannel( base::ChannelElementBase::shared_ptr chan ) { 00165 RClist.erase( chan ); 00166 } 00167 00168 bool initialize() { 00169 log(Info) <<"Started " << this->getName() << "." <<endlog(); 00170 do_exit = false; 00171 return true; 00172 } 00173 00174 void loop() { 00175 while ( !RClist.empty() && !do_exit) { 00176 base::ChannelElementBase::shared_ptr chan = RClist.front(); 00177 CRemoteChannelElement_i* rbase = dynamic_cast<CRemoteChannelElement_i*>(chan.get()); 00178 if (rbase) 00179 rbase->transferSamples(); 00180 RClist.erase( chan ); 00181 } 00182 } 00183 00184 bool breakLoop() { 00185 do_exit = true; 00186 return true; 00187 } 00188 }; 00189 } 00190 } 00191 #endif