42 #include <sys/types.h>    48 #include <boost/algorithm/string.hpp>    51 #include "../../types/TypeTransporter.hpp"    52 #include "../../types/TypeMarshaller.hpp"    53 #include "../../Logger.hpp"    55 #include "../../base/PortInterface.hpp"    56 #include "../../DataFlowInterface.hpp"    57 #include "../../TaskContext.hpp"    65     mtransport(transport), marshaller_cookie(0), buf(0), mis_sender(false), minit_done(false), max_size(0), mdata_size(0)
    82             throw std::runtime_error(
"MQ name_id not set, and the port is either not attached to a task, or said task has no name. Cannot create a reasonably unique MQ name automatically");
    84         std::stringstream name_stream;
    86         std::string name = name_stream.str();
    87         boost::algorithm::replace_all(name, 
"/", 
"_");
    92     mattr.mq_maxmsg = policy.
size ? policy.
size : 10;
    96         throw std::runtime_error(
"Could not open message queue with wrong name. Names must start with '/' and contain no more '/' after the first one.");
    98         throw std::runtime_error(
"Could not open message queue with zero message size.");
   101         oflag |= O_WRONLY | O_NONBLOCK;
   104     mqdes = mq_open(policy.
name_id.c_str(), oflag, S_IREAD | S_IWRITE, &mattr);
   108         int the_error = errno;
   109         log(
Error) << 
"FAILED opening '" << policy.
name_id << 
"' with message size " << mattr.mq_msgsize << 
", buffer size " << mattr.mq_maxmsg << 
" for "   110                 << (is_sender ? 
"writing :" : 
"reading :") << 
endlog();
   115             log(
Error) << 
"The queue exists, but the caller does not have permission to open it in the specified mode." << 
endlog();
   119             log(
Error) << 
"Wrong mqueue name given OR, In a process  that  is  unprivileged  (does  not  have  the "   120                     << 
"CAP_SYS_RESOURCE  capability),  attr->mq_maxmsg  must  be  less than or equal to the msg_max limit, and attr->mq_msgsize must be less than or equal to the msgsize_max limit.  In addition, even in a privileged process, "   121                     << 
"attr->mq_maxmsg cannot exceed the HARD_MAX limit.  (See mq_overview(7) for details of these limits.)" << 
endlog();
   124             log(
Error) << 
"The process already has the maximum number of files and message queues open." << 
endlog();
   130             log(
Error) << 
"The system limit on the total number of open files and message queues has been reached." << 
endlog();
   134                     << 
"Insufficient space for the creation of a new message queue.  This probably occurred because the queues_max limit was encountered; see mq_overview(7)."   141             log(
Error) << 
"Submit a bug report. An unexpected mq error occured with errno=" << errno << 
": " << strerror(errno) << 
endlog();
   143         throw std::runtime_error(
"Could not open message queue: mq_open returned -1.");
   146     log(
Debug) << 
"Opened '" << policy.
name_id << 
"' with mqdes='" << 
mqdes << 
"', msg size='"<<mattr.mq_msgsize<<
"' an queue length='"<<mattr.mq_maxmsg<<
"' for " << (is_sender ? 
"writing." : 
"reading.") << 
endlog();
   172         mq_unlink(
mqname.c_str());
   209         struct timespec abs_timeout;
   212         abs_timeout.tv_sec += abs_timeout.tv_nsec / (1000*1000*1000);
   213         abs_timeout.tv_nsec = abs_timeout.tv_nsec % (1000*1000*1000);
   227                 log(
Error) << 
"Failed to initialize MQ Channel Element with initial data sample." << 
endlog();
   233             log(
Error) << 
"Failed to receive initial data sample for MQ Channel Element: " << strerror(errno) << 
endlog();
   249     struct timespec abs_timeout;
   252     abs_timeout.tv_sec += abs_timeout.tv_nsec / (1000*1000*1000);
   253     abs_timeout.tv_nsec = abs_timeout.tv_nsec % (1000*1000*1000);
   261         log(
Error) << 
"Failed to read from MQ Channel Element: no data received within 500ms!" <<
endlog();
   280     char* lbuf = (
char*) blob.first;
   281     if (mq_send(
mqdes, lbuf, blob.second, 0) == -1)
   286         log(
Error) << 
"MQChannel "<< 
mqdes << 
" became invalid (mq length="<<
max_size<<
", msg length="<<blob.second<<
"): " << strerror(errno) << 
endlog();
 virtual bool updateFromBlob(const void *blob, int size, base::DataSourceBase::shared_ptr target, void *cookie=0) const =0
types::TypeMarshaller const & mtransport
virtual void * createCookie() const 
virtual void mqNewSample(base::DataSourceBase::shared_ptr ds)
bool mqRead(base::DataSourceBase::shared_ptr ds)
void setupStream(base::DataSourceBase::shared_ptr ds, base::PortInterface *port, ConnPolicy const &policy, bool is_sender)
const std::string & getName() const 
bool mqWrite(base::DataSourceBase::shared_ptr ds)
virtual bool mqReady(base::DataSourceBase::shared_ptr ds, base::ChannelElementBase *chan)
DataFlowInterface * getInterface() const 
virtual void deleteCookie(void *cookie) const 
virtual std::pair< void const *, int > fillBlob(base::DataSourceBase::shared_ptr source, void *blob, int size, void *cookie=0) const =0
MQSendRecv(types::TypeMarshaller const &transport)
static int clock_gettime(int clk_id, struct timespec *tp)
virtual unsigned int getSampleSize(base::DataSourceBase::shared_ptr sample, void *cookie=0) const =0
nsecs Seconds_to_nsecs(const Seconds s)
boost::intrusive_ptr< DataSourceBase > shared_ptr
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute. 
static Logger::LogFunction endlog()
virtual const std::string & getName() const 
TaskContext * getOwner() const 
static Dispatcher::shared_ptr Instance()