40 #include "../../os/MutexLock.hpp" 41 #include "../../Activity.hpp" 42 #include "../../base/ChannelElementBase.hpp" 43 #include "../../Logger.hpp" 45 #include <sys/select.h> 48 namespace RTT {
namespace mqueue {
class Dispatcher; } }
69 typedef std::map<mqd_t,base::ChannelElementBase*>
MQMap;
82 highsock(0), do_exit(false)
109 for (MQMap::const_iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
110 FD_SET( it->first, &socks);
111 if (
int(it->first) > highsock)
112 highsock = int(it->first);
123 for (MQMap::iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
124 if ( FD_ISSET( it->first, &socks) ) {
126 it->second->signal();
135 if ( DispatchI == 0) {
145 log(
Error) <<
"Invalid mqd_t given to MQueue Dispatcher." <<
endlog();
148 log(
Debug) <<
"Dispatcher is monitoring mqdes "<< mqdes <<
endlog();
151 if (mqmap.count(mqdes) == 0)
160 if (mqmap.count(mqdes)) {
161 mqmap.erase( mqmap.find(mqdes) );
172 struct timeval timeout;
177 timeout.tv_usec = 50000;
182 readsocks = select(highsock+1, &socks, (fd_set *) 0,
183 (fd_set *) 0, &timeout);
197 log(
Error) <<
"Dispatcher failed to select on message queues. Stopped thread. error: "<<strerror(errno)<<
endlog();
201 else if (readsocks == 0) {
Dispatcher(const std::string &name)
void intrusive_ptr_release(const RTT::mqueue::Dispatcher *p)
std::map< mqd_t, base::ChannelElementBase * > MQMap
boost::intrusive_ptr< Dispatcher > shared_ptr
void removeQueue(mqd_t mqdes)
const int HighestPriority
void intrusive_ptr_add_ref(const RTT::mqueue::Dispatcher *p)
An Activity executes a RunnableInterface object in a (periodic) thread.
void addQueue(mqd_t mqdes, base::ChannelElementBase *chan)
An object oriented wrapper around a non recursive mutex.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
static Dispatcher * DispatchI
static Logger::LogFunction endlog()
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
static Dispatcher::shared_ptr Instance()