00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "../../os/MutexLock.hpp"
00041 #include "../../Activity.hpp"
00042 #include "../../base/ChannelElementBase.hpp"
00043 #include "../../Logger.hpp"
00044 #include <map>
00045 #include <sys/select.h>
00046 #include <mqueue.h>
00047
00048 namespace RTT { namespace mqueue { class Dispatcher; } }
00049
00050 namespace RTT {
00051 namespace mqueue {
00052 RTT_API void intrusive_ptr_add_ref(const RTT::mqueue::Dispatcher* p );
00053 RTT_API void intrusive_ptr_release(const RTT::mqueue::Dispatcher* p );
00054
00062 class Dispatcher : public Activity
00063 {
00064 friend void intrusive_ptr_add_ref(const RTT::mqueue::Dispatcher* p );
00065 friend void intrusive_ptr_release(const RTT::mqueue::Dispatcher* p );
00066 mutable os::AtomicInt refcount;
00067 static Dispatcher* DispatchI;
00068
00069 typedef std::map<mqd_t,base::ChannelElementBase*> MQMap;
00070 MQMap mqmap;
00071
00072 fd_set socks;
00073
00074 int highsock;
00075
00076 bool do_exit;
00077
00078 os::Mutex maplock;
00079
00080 Dispatcher( const std::string& name)
00081 : Activity(ORO_SCHED_RT, os::HighestPriority, 0.0, 0, name),
00082 highsock(0), do_exit(false)
00083 {}
00084
00085 ~Dispatcher() {
00086 Logger::In in("Dispatcher");
00087 log(Info) << "Dispacher cleans up: no more work."<<endlog();
00088 stop();
00089 DispatchI = 0;
00090 }
00091
00092 void build_select_list() {
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103 FD_ZERO(&socks);
00104 highsock = 0;
00105
00106
00107
00108 os::MutexLock lock(maplock);
00109 for (MQMap::const_iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
00110 FD_SET( it->first, &socks);
00111 if ( int(it->first) > highsock)
00112 highsock = int(it->first);
00113 }
00114 }
00115
00116 void read_socks() {
00117
00118
00119
00120
00121
00122 os::MutexLock lock(maplock);
00123 for (MQMap::iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
00124 if ( FD_ISSET( it->first, &socks) ) {
00125
00126 it->second->signal();
00127 }
00128 }
00129 }
00130
00131 public:
00132 typedef boost::intrusive_ptr<Dispatcher> shared_ptr;
00133
00134 static Dispatcher::shared_ptr Instance() {
00135 if ( DispatchI == 0) {
00136 DispatchI = new Dispatcher("MQueueDispatch");
00137 DispatchI->start();
00138 }
00139 return DispatchI;
00140 }
00141
00142 void addQueue( mqd_t mqdes, base::ChannelElementBase* chan ) {
00143 Logger::In in("Dispatcher");
00144 if (mqdes < 0) {
00145 log(Error) <<"Invalid mqd_t given to MQueue Dispatcher." <<endlog();
00146 return;
00147 }
00148 log(Debug) <<"Dispatcher is monitoring mqdes "<< mqdes <<endlog();
00149 os::MutexLock lock(maplock);
00150
00151 if (mqmap.count(mqdes) == 0)
00152 refcount.inc();
00153 mqmap[mqdes] = chan;
00154 }
00155
00156 void removeQueue(mqd_t mqdes) {
00157 Logger::In in("Dispatcher");
00158 log(Debug) <<"Dispatcher drops mqdes "<< mqdes <<endlog();
00159 os::MutexLock lock(maplock);
00160 if (mqmap.count(mqdes)) {
00161 mqmap.erase( mqmap.find(mqdes) );
00162 refcount.dec();
00163 }
00164 }
00165
00166 bool initialize() {
00167 do_exit = false;
00168 return true;
00169 }
00170
00171 void loop() {
00172 struct timeval timeout;
00173 int readsocks;
00174 while (1) {
00175 build_select_list();
00176 timeout.tv_sec = 0;
00177 timeout.tv_usec = 50000;
00178
00179
00180
00181
00182 readsocks = select(highsock+1, &socks, (fd_set *) 0,
00183 (fd_set *) 0, &timeout);
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194 if (readsocks < 0) {
00195 log(Error) <<"Dispatcher failed to select on message queues. Stopped thread."<<endlog();
00196 return;
00197 }
00198 if (readsocks == 0) {
00199
00200 } else
00201 read_socks();
00202
00203 if ( do_exit )
00204 return;
00205 }
00206 }
00207
00208 bool breakLoop() {
00209 do_exit = true;
00210 return true;
00211 }
00212 };
00213 }
00214 }
00215