Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 #include "../../os/MutexLock.hpp"
00041 #include "../../Activity.hpp"
00042 #include "../../base/ChannelElementBase.hpp"
00043 #include "../../Logger.hpp"
00044 #include <map>
00045 #include <sys/select.h>
00046 #include <mqueue.h>
00047 
00048 namespace RTT { namespace mqueue { class Dispatcher; } }
00049 
00050 namespace RTT {
00051     namespace mqueue {
00052         RTT_API void intrusive_ptr_add_ref(const RTT::mqueue::Dispatcher* p );
00053         RTT_API void intrusive_ptr_release(const RTT::mqueue::Dispatcher* p );
00054 
00062         class Dispatcher : public Activity
00063         {
00064             friend void intrusive_ptr_add_ref(const RTT::mqueue::Dispatcher* p );
00065             friend void intrusive_ptr_release(const RTT::mqueue::Dispatcher* p );
00066             mutable os::AtomicInt refcount;
00067             static Dispatcher* DispatchI;
00068 
00069             typedef std::map<mqd_t,base::ChannelElementBase*> MQMap;
00070             MQMap mqmap;
00071 
00072             fd_set socks;        
00073 
00074             int highsock;        
00075 
00076             bool do_exit;
00077 
00078             os::Mutex maplock;
00079 
00080             Dispatcher( const std::string& name)
00081             : Activity(ORO_SCHED_RT, os::HighestPriority, 0.0, 0, name),
00082               highsock(0), do_exit(false)
00083               {}
00084 
00085             ~Dispatcher() {
00086                 Logger::In in("Dispatcher");
00087                 log(Info) << "Dispacher cleans up: no more work."<<endlog();
00088                 stop();
00089                 DispatchI = 0;
00090             }
00091 
00092             void build_select_list() {
00093 
00094                 
00095 
00096 
00097 
00098 
00099 
00100                 
00101 
00102 
00103                 FD_ZERO(&socks);
00104                 highsock = 0;
00105 
00106                 
00107 
00108                 os::MutexLock lock(maplock);
00109                 for (MQMap::const_iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
00110                     FD_SET( it->first, &socks);
00111                     if ( int(it->first) > highsock)
00112                         highsock = int(it->first);
00113                 }
00114             }
00115 
00116             void read_socks() {
00117                 
00118 
00119 
00120                 
00121 
00122                 os::MutexLock lock(maplock);
00123                 for (MQMap::iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
00124                     if ( FD_ISSET( it->first, &socks) ) {
00125                         
00126                         it->second->signal();
00127                     }
00128                 }
00129             }
00130 
00131         public:
00132             typedef boost::intrusive_ptr<Dispatcher> shared_ptr;
00133 
00134             static Dispatcher::shared_ptr Instance() {
00135                 if ( DispatchI == 0) {
00136                     DispatchI = new Dispatcher("MQueueDispatch");
00137                     DispatchI->start();
00138                 }
00139                 return DispatchI;
00140             }
00141 
00142             void addQueue( mqd_t mqdes, base::ChannelElementBase* chan ) {
00143                 Logger::In in("Dispatcher");
00144                 if (mqdes < 0) {
00145                     log(Error) <<"Invalid mqd_t given to MQueue Dispatcher." <<endlog();
00146                     return;
00147                 }
00148                 log(Debug) <<"Dispatcher is monitoring mqdes "<< mqdes <<endlog();
00149                 os::MutexLock lock(maplock);
00150                 
00151                 if (mqmap.count(mqdes) == 0)
00152                     refcount.inc();
00153                 mqmap[mqdes] = chan;
00154             }
00155 
00156             void removeQueue(mqd_t mqdes) {
00157                 Logger::In in("Dispatcher");
00158                 log(Debug) <<"Dispatcher drops mqdes "<< mqdes <<endlog();
00159                 os::MutexLock lock(maplock);
00160                 if (mqmap.count(mqdes)) {
00161                     mqmap.erase( mqmap.find(mqdes) );
00162                     refcount.dec();
00163                 }
00164             }
00165 
00166             bool initialize() {
00167                 do_exit = false;
00168                 return true;
00169             }
00170 
00171             void loop() {
00172                 struct timeval timeout;  
00173                 int readsocks;       
00174                 while (1) { 
00175                     build_select_list();
00176                     timeout.tv_sec = 0;
00177                     timeout.tv_usec = 50000;
00178 
00179                     
00180 
00181 
00182                     readsocks = select(highsock+1, &socks, (fd_set *) 0,
00183                       (fd_set *) 0, &timeout);
00184 
00185                     
00186 
00187 
00188                     
00189 
00190 
00191 
00192 
00193 
00194                     if (readsocks < 0) {
00195                         log(Error) <<"Dispatcher failed to select on message queues. Stopped thread."<<endlog();
00196                         return;
00197                     }
00198                     if (readsocks == 0) {
00199                         
00200                     } else 
00201                         read_socks();
00202 
00203                     if ( do_exit )
00204                         return;
00205                 } 
00206             }
00207 
00208             bool breakLoop() {
00209                 do_exit = true;
00210                 return true;
00211             }
00212         };
00213     }
00214 }
00215