$search
00001 /*************************************************************************** 00002 tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 Dispatcher.hpp 00003 00004 Dispatcher.hpp - description 00005 ------------------- 00006 begin : Thu October 22 2009 00007 copyright : (C) 2009 Peter Soetens 00008 email : peter@thesourcworks.com 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 00040 #include "../../os/MutexLock.hpp" 00041 #include "../../Activity.hpp" 00042 #include "../../base/ChannelElementBase.hpp" 00043 #include "../../Logger.hpp" 00044 #include <map> 00045 #include <sys/select.h> 00046 #include <mqueue.h> 00047 00048 namespace RTT { namespace mqueue { class Dispatcher; } } 00049 00050 namespace RTT { 00051 namespace mqueue { 00052 RTT_API void intrusive_ptr_add_ref(const RTT::mqueue::Dispatcher* p ); 00053 RTT_API void intrusive_ptr_release(const RTT::mqueue::Dispatcher* p ); 00054 00062 class Dispatcher : public Activity 00063 { 00064 friend void intrusive_ptr_add_ref(const RTT::mqueue::Dispatcher* p ); 00065 friend void intrusive_ptr_release(const RTT::mqueue::Dispatcher* p ); 00066 mutable os::AtomicInt refcount; 00067 static Dispatcher* DispatchI; 00068 00069 typedef std::map<mqd_t,base::ChannelElementBase*> MQMap; 00070 MQMap mqmap; 00071 00072 fd_set socks; /* Socket file descriptors we want to wake up for, using select() */ 00073 00074 int highsock; /* Highest #'d file descriptor, needed for select() */ 00075 00076 bool do_exit; 00077 00078 os::Mutex maplock; 00079 00080 Dispatcher( const std::string& name) 00081 : Activity(ORO_SCHED_RT, os::HighestPriority, 0.0, 0, name), 00082 highsock(0), do_exit(false) 00083 {} 00084 00085 ~Dispatcher() { 00086 Logger::In in("Dispatcher"); 00087 log(Info) << "Dispacher cleans up: no more work."<<endlog(); 00088 stop(); 00089 DispatchI = 0; 00090 } 00091 00092 void build_select_list() { 00093 00094 /* First put together fd_set for select(), which will 00095 consist of the sock veriable in case a new connection 00096 is coming in, plus all the sockets we have already 00097 accepted. */ 00098 00099 00100 /* FD_ZERO() clears out the fd_set called socks, so that 00101 it doesn't contain any file descriptors. */ 00102 00103 FD_ZERO(&socks); 00104 highsock = 0; 00105 00106 /* Loops through all the possible connections and adds 00107 those sockets to the fd_set */ 00108 os::MutexLock lock(maplock); 00109 for (MQMap::const_iterator it = mqmap.begin(); it != mqmap.end(); ++it) { 00110 FD_SET( it->first, &socks); 00111 if ( int(it->first) > highsock) 00112 highsock = int(it->first); 00113 } 00114 } 00115 00116 void read_socks() { 00117 /* OK, now socks will be set with whatever socket(s) 00118 are ready for reading.*/ 00119 00120 /* Run through our sockets and check to see if anything 00121 happened with them, if so 'service' them. */ 00122 os::MutexLock lock(maplock); 00123 for (MQMap::iterator it = mqmap.begin(); it != mqmap.end(); ++it) { 00124 if ( FD_ISSET( it->first, &socks) ) { 00125 //log(Debug) << "New data on " << it->first <<endlog(); 00126 it->second->signal(); 00127 } 00128 } 00129 } 00130 00131 public: 00132 typedef boost::intrusive_ptr<Dispatcher> shared_ptr; 00133 00134 static Dispatcher::shared_ptr Instance() { 00135 if ( DispatchI == 0) { 00136 DispatchI = new Dispatcher("MQueueDispatch"); 00137 DispatchI->start(); 00138 } 00139 return DispatchI; 00140 } 00141 00142 void addQueue( mqd_t mqdes, base::ChannelElementBase* chan ) { 00143 Logger::In in("Dispatcher"); 00144 if (mqdes < 0) { 00145 log(Error) <<"Invalid mqd_t given to MQueue Dispatcher." <<endlog(); 00146 return; 00147 } 00148 log(Debug) <<"Dispatcher is monitoring mqdes "<< mqdes <<endlog(); 00149 os::MutexLock lock(maplock); 00150 // we add a refcount per channel we monitor. 00151 if (mqmap.count(mqdes) == 0) 00152 refcount.inc(); 00153 mqmap[mqdes] = chan; 00154 } 00155 00156 void removeQueue(mqd_t mqdes) { 00157 Logger::In in("Dispatcher"); 00158 log(Debug) <<"Dispatcher drops mqdes "<< mqdes <<endlog(); 00159 os::MutexLock lock(maplock); 00160 if (mqmap.count(mqdes)) { 00161 mqmap.erase( mqmap.find(mqdes) ); 00162 refcount.dec(); 00163 } 00164 } 00165 00166 bool initialize() { 00167 do_exit = false; 00168 return true; 00169 } 00170 00171 void loop() { 00172 struct timeval timeout; /* Timeout for select */ 00173 int readsocks; /* Number of sockets ready for reading */ 00174 while (1) { /* select loop */ 00175 build_select_list(); 00176 timeout.tv_sec = 0; 00177 timeout.tv_usec = 50000; 00178 00179 /* The first argument to select is the highest file 00180 descriptor value plus 1.*/ 00181 00182 readsocks = select(highsock+1, &socks, (fd_set *) 0, 00183 (fd_set *) 0, &timeout); 00184 00185 /* select() returns the number of sockets that had 00186 things going on with them -- i.e. they're readable. */ 00187 00188 /* Once select() returns, the original fd_set has been 00189 modified so it now reflects the state of why select() 00190 woke up. i.e. If file descriptor 4 was originally in 00191 the fd_set, and then it became readable, the fd_set 00192 contains file descriptor 4 in it. */ 00193 00194 if (readsocks < 0) { 00195 log(Error) <<"Dispatcher failed to select on message queues. Stopped thread."<<endlog(); 00196 return; 00197 } 00198 if (readsocks == 0) { 00199 // nop 00200 } else // readsocks > 0 00201 read_socks(); 00202 00203 if ( do_exit ) 00204 return; 00205 } /* while(1) */ 00206 } 00207 00208 bool breakLoop() { 00209 do_exit = true; 00210 return true; 00211 } 00212 }; 00213 } 00214 } 00215