42 #include <sys/types.h> 48 #include <boost/algorithm/string.hpp> 51 #include "../../types/TypeTransporter.hpp" 52 #include "../../types/TypeMarshaller.hpp" 53 #include "../../Logger.hpp" 55 #include "../../base/PortInterface.hpp" 56 #include "../../DataFlowInterface.hpp" 57 #include "../../TaskContext.hpp" 65 mtransport(transport), marshaller_cookie(0), buf(0), mis_sender(false), minit_done(false), max_size(0), mdata_size(0)
82 throw std::runtime_error(
"MQ name_id not set, and the port is either not attached to a task, or said task has no name. Cannot create a reasonably unique MQ name automatically");
84 std::stringstream name_stream;
86 std::string name = name_stream.str();
87 boost::algorithm::replace_all(name,
"/",
"_");
92 mattr.mq_maxmsg = policy.
size ? policy.
size : 10;
96 throw std::runtime_error(
"Could not open message queue with wrong name. Names must start with '/' and contain no more '/' after the first one.");
98 throw std::runtime_error(
"Could not open message queue with zero message size.");
101 oflag |= O_WRONLY | O_NONBLOCK;
104 mqdes = mq_open(policy.
name_id.c_str(), oflag, S_IREAD | S_IWRITE, &mattr);
108 int the_error = errno;
109 log(
Error) <<
"FAILED opening '" << policy.
name_id <<
"' with message size " << mattr.mq_msgsize <<
", buffer size " << mattr.mq_maxmsg <<
" for " 110 << (is_sender ?
"writing :" :
"reading :") <<
endlog();
115 log(
Error) <<
"The queue exists, but the caller does not have permission to open it in the specified mode." <<
endlog();
119 log(
Error) <<
"Wrong mqueue name given OR, In a process that is unprivileged (does not have the " 120 <<
"CAP_SYS_RESOURCE capability), attr->mq_maxmsg must be less than or equal to the msg_max limit, and attr->mq_msgsize must be less than or equal to the msgsize_max limit. In addition, even in a privileged process, " 121 <<
"attr->mq_maxmsg cannot exceed the HARD_MAX limit. (See mq_overview(7) for details of these limits.)" <<
endlog();
124 log(
Error) <<
"The process already has the maximum number of files and message queues open." <<
endlog();
130 log(
Error) <<
"The system limit on the total number of open files and message queues has been reached." <<
endlog();
134 <<
"Insufficient space for the creation of a new message queue. This probably occurred because the queues_max limit was encountered; see mq_overview(7)." 141 log(
Error) <<
"Submit a bug report. An unexpected mq error occured with errno=" << errno <<
": " << strerror(errno) <<
endlog();
143 throw std::runtime_error(
"Could not open message queue: mq_open returned -1.");
146 log(
Debug) <<
"Opened '" << policy.
name_id <<
"' with mqdes='" <<
mqdes <<
"', msg size='"<<mattr.mq_msgsize<<
"' an queue length='"<<mattr.mq_maxmsg<<
"' for " << (is_sender ?
"writing." :
"reading.") <<
endlog();
172 mq_unlink(
mqname.c_str());
209 struct timespec abs_timeout;
212 abs_timeout.tv_sec += abs_timeout.tv_nsec / (1000*1000*1000);
213 abs_timeout.tv_nsec = abs_timeout.tv_nsec % (1000*1000*1000);
227 log(
Error) <<
"Failed to initialize MQ Channel Element with initial data sample." <<
endlog();
233 log(
Error) <<
"Failed to receive initial data sample for MQ Channel Element: " << strerror(errno) <<
endlog();
249 struct timespec abs_timeout;
252 abs_timeout.tv_sec += abs_timeout.tv_nsec / (1000*1000*1000);
253 abs_timeout.tv_nsec = abs_timeout.tv_nsec % (1000*1000*1000);
261 log(
Error) <<
"Failed to read from MQ Channel Element: no data received within 500ms!" <<
endlog();
280 char* lbuf = (
char*) blob.first;
281 if (mq_send(
mqdes, lbuf, blob.second, 0) == -1)
286 log(
Error) <<
"MQChannel "<<
mqdes <<
" became invalid (mq length="<<
max_size<<
", msg length="<<blob.second<<
"): " << strerror(errno) <<
endlog();
virtual bool updateFromBlob(const void *blob, int size, base::DataSourceBase::shared_ptr target, void *cookie=0) const =0
types::TypeMarshaller const & mtransport
virtual void * createCookie() const
virtual void mqNewSample(base::DataSourceBase::shared_ptr ds)
bool mqRead(base::DataSourceBase::shared_ptr ds)
void setupStream(base::DataSourceBase::shared_ptr ds, base::PortInterface *port, ConnPolicy const &policy, bool is_sender)
const std::string & getName() const
bool mqWrite(base::DataSourceBase::shared_ptr ds)
virtual bool mqReady(base::DataSourceBase::shared_ptr ds, base::ChannelElementBase *chan)
DataFlowInterface * getInterface() const
virtual void deleteCookie(void *cookie) const
virtual std::pair< void const *, int > fillBlob(base::DataSourceBase::shared_ptr source, void *blob, int size, void *cookie=0) const =0
MQSendRecv(types::TypeMarshaller const &transport)
static int clock_gettime(int clk_id, struct timespec *tp)
virtual unsigned int getSampleSize(base::DataSourceBase::shared_ptr sample, void *cookie=0) const =0
nsecs Seconds_to_nsecs(const Seconds s)
boost::intrusive_ptr< DataSourceBase > shared_ptr
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
static Logger::LogFunction endlog()
virtual const std::string & getName() const
TaskContext * getOwner() const
static Dispatcher::shared_ptr Instance()