40 #include "../../os/MutexLock.hpp"    41 #include "../../Activity.hpp"    42 #include "../../base/ChannelElementBase.hpp"    43 #include "../../Logger.hpp"    45 #include <sys/select.h>    48 namespace RTT { 
namespace mqueue { 
class Dispatcher; } }
    69             typedef std::map<mqd_t,base::ChannelElementBase*> 
MQMap;
    82               highsock(0), do_exit(false)
   109                 for (MQMap::const_iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
   110                     FD_SET( it->first, &socks);
   111                     if ( 
int(it->first) > highsock)
   112                         highsock = int(it->first);
   123                 for (MQMap::iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
   124                     if ( FD_ISSET( it->first, &socks) ) {
   126                         it->second->signal();
   135                 if ( DispatchI == 0) {
   145                     log(
Error) <<
"Invalid mqd_t given to MQueue Dispatcher." <<
endlog();
   148                 log(
Debug) <<
"Dispatcher is monitoring mqdes "<< mqdes <<
endlog();
   151                 if (mqmap.count(mqdes) == 0)
   160                 if (mqmap.count(mqdes)) {
   161                     mqmap.erase( mqmap.find(mqdes) );
   172                 struct timeval timeout;  
   177                     timeout.tv_usec = 50000;
   182                     readsocks = select(highsock+1, &socks, (fd_set *) 0,
   183                       (fd_set *) 0, &timeout);
   197                             log(
Error) <<
"Dispatcher failed to select on message queues. Stopped thread. error: "<<strerror(errno)<<
endlog();
   201                     else if (readsocks == 0) {
 
Dispatcher(const std::string &name)
void intrusive_ptr_release(const RTT::mqueue::Dispatcher *p)
std::map< mqd_t, base::ChannelElementBase * > MQMap
boost::intrusive_ptr< Dispatcher > shared_ptr
void removeQueue(mqd_t mqdes)
const int HighestPriority
void intrusive_ptr_add_ref(const RTT::mqueue::Dispatcher *p)
An Activity executes a RunnableInterface object in a (periodic) thread. 
void addQueue(mqd_t mqdes, base::ChannelElementBase *chan)
An object oriented wrapper around a non recursive mutex. 
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute. 
static Dispatcher * DispatchI
static Logger::LogFunction endlog()
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
static Dispatcher::shared_ptr Instance()