00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <fcntl.h>
00040 #include <sys/stat.h>
00041 #include <mqueue.h>
00042 #include <sys/types.h>
00043 #include <unistd.h>
00044 #include <sstream>
00045 #include <cassert>
00046 #include <stdexcept>
00047 #include <errno.h>
00048
00049 #include "MQSendRecv.hpp"
00050 #include "../../types/TypeTransporter.hpp"
00051 #include "../../types/TypeMarshaller.hpp"
00052 #include "../../Logger.hpp"
00053 #include "Dispatcher.hpp"
00054 #include "../../base/PortInterface.hpp"
00055 #include "../../DataFlowInterface.hpp"
00056 #include "../../TaskContext.hpp"
00057
00058 using namespace RTT;
00059 using namespace RTT::detail;
00060 using namespace RTT::mqueue;
00061
00062
00063 MQSendRecv::MQSendRecv(types::TypeMarshaller const& transport) :
00064 mtransport(transport), marshaller_cookie(0), buf(0), mis_sender(false), minit_done(false), max_size(0), mdata_size(0)
00065 {
00066 }
00067
00068 void MQSendRecv::setupStream(base::DataSourceBase::shared_ptr ds, base::PortInterface* port, ConnPolicy const& policy,
00069 bool is_sender)
00070 {
00071 Logger::In in("MQSendRecv");
00072
00073 mdata_size = policy.data_size;
00074 max_size = policy.data_size ? policy.data_size : mtransport.getSampleSize(ds);
00075 marshaller_cookie = mtransport.createCookie();
00076 mis_sender = is_sender;
00077
00078 std::stringstream namestr;
00079 namestr << '/' << port->getInterface()->getOwner()->getName() << '.' << port->getName() << '.' << this << '@' << getpid();
00080
00081 if (policy.name_id.empty())
00082 policy.name_id = namestr.str();
00083
00084 struct mq_attr mattr;
00085 mattr.mq_maxmsg = policy.size ? policy.size : 10;
00086 mattr.mq_msgsize = max_size;
00087 assert( max_size );
00088 if (policy.name_id[0] != '/')
00089 throw std::runtime_error("Could not open message queue with wrong name. Names must start with '/' and contain no more '/' after the first one.");
00090 if (max_size <= 0)
00091 throw std::runtime_error("Could not open message queue with zero message size.");
00092 int oflag = O_CREAT;
00093 if (mis_sender)
00094 oflag |= O_WRONLY | O_NONBLOCK;
00095 else
00096 oflag |= O_RDONLY;
00097 mqdes = mq_open(policy.name_id.c_str(), oflag, S_IREAD | S_IWRITE, &mattr);
00098
00099 if (mqdes < 0)
00100 {
00101 int the_error = errno;
00102 log(Error) << "FAILED opening '" << policy.name_id << "' with message size " << mattr.mq_msgsize << ", buffer size " << mattr.mq_maxmsg << " for "
00103 << (is_sender ? "writing :" : "reading :") << endlog();
00104
00105 switch (the_error)
00106 {
00107 case EACCES:
00108 log(Error) << "The queue exists, but the caller does not have permission to open it in the specified mode." << endlog();
00109 break;
00110 case EINVAL:
00111
00112 log(Error) << "Wrong mqueue name given OR, In a process that is unprivileged (does not have the "
00113 << "CAP_SYS_RESOURCE capability), attr->mq_maxmsg must be less than or equal to the msg_max limit, and attr->mq_msgsize must be less than or equal to the msgsize_max limit. In addition, even in a privileged process, "
00114 << "attr->mq_maxmsg cannot exceed the HARD_MAX limit. (See mq_overview(7) for details of these limits.)" << endlog();
00115 break;
00116 case EMFILE:
00117 log(Error) << "The process already has the maximum number of files and message queues open." << endlog();
00118 break;
00119 case ENAMETOOLONG:
00120 log(Error) << "Name was too long." << endlog();
00121 break;
00122 case ENFILE:
00123 log(Error) << "The system limit on the total number of open files and message queues has been reached." << endlog();
00124 break;
00125 case ENOSPC:
00126 log(Error)
00127 << "Insufficient space for the creation of a new message queue. This probably occurred because the queues_max limit was encountered; see mq_overview(7)."
00128 << endlog();
00129 break;
00130 case ENOMEM:
00131 log(Error) << "Insufficient memory." << endlog();
00132 break;
00133 default:
00134 log(Error) << "Submit a bug report. An unexpected mq error occured with errno=" << errno << ": " << strerror(errno) << endlog();
00135 }
00136 throw std::runtime_error("Could not open message queue: mq_open returned -1.");
00137 }
00138
00139 log(Debug) << "Opened '" << policy.name_id << "' with mqdes='" << mqdes << "', msg size='"<<mattr.mq_msgsize<<"' an queue length='"<<mattr.mq_maxmsg<<"' for " << (is_sender ? "writing." : "reading.") << endlog();
00140
00141 buf = new char[max_size];
00142 memset(buf, 0, max_size);
00143 mqname = policy.name_id;
00144 }
00145
00146 MQSendRecv::~MQSendRecv()
00147 {
00148 if ( mqdes > 0)
00149 mq_close(mqdes);
00150 }
00151
00152 void MQSendRecv::cleanupStream()
00153 {
00154 if (!mis_sender)
00155 {
00156 if (minit_done)
00157 {
00158 Dispatcher::Instance()->removeQueue(mqdes);
00159 minit_done = false;
00160 }
00161 }
00162 else
00163 {
00164
00165 mq_unlink(mqname.c_str());
00166 }
00167
00168 mq_close( mqdes);
00169
00170 if (marshaller_cookie)
00171 mtransport.deleteCookie(marshaller_cookie);
00172
00173 if (buf)
00174 {
00175 delete[] buf;
00176 buf = 0;
00177 }
00178 }
00179
00180
00181 void MQSendRecv::mqNewSample(RTT::base::DataSourceBase::shared_ptr ds)
00182 {
00183
00184 if (mdata_size == 0)
00185 max_size = mtransport.getSampleSize(ds);
00186 delete[] buf;
00187 buf = new char[max_size];
00188 memset(buf, 0, max_size);
00189 }
00190
00191 bool MQSendRecv::mqReady(base::DataSourceBase::shared_ptr ds, base::ChannelElementBase* chan)
00192 {
00193 if (minit_done)
00194 return true;
00195
00196 if (!mis_sender)
00197 {
00198
00199
00200
00201
00202 struct timespec abs_timeout;
00203 clock_gettime(CLOCK_REALTIME, &abs_timeout);
00204 abs_timeout.tv_nsec += Seconds_to_nsecs(0.5);
00205 abs_timeout.tv_sec += abs_timeout.tv_nsec / (1000*1000*1000);
00206 abs_timeout.tv_nsec = abs_timeout.tv_nsec % (1000*1000*1000);
00207
00208 ssize_t ret = mq_timedreceive(mqdes, buf, max_size, 0, &abs_timeout);
00209 if (ret != -1)
00210 {
00211 if (mtransport.updateFromBlob((void*) buf, ret, ds, marshaller_cookie))
00212 {
00213 minit_done = true;
00214
00215 Dispatcher::Instance()->addQueue(mqdes, chan);
00216 return true;
00217 }
00218 else
00219 {
00220 log(Error) << "Failed to initialize MQ Channel Element with initial data sample." << endlog();
00221 return false;
00222 }
00223 }
00224 else
00225 {
00226 log(Error) << "Failed to receive initial data sample for MQ Channel Element: " << strerror(errno) << endlog();
00227 return false;
00228 }
00229 }
00230 else
00231 {
00232 assert( !mis_sender );
00233 return false;
00234 }
00235 return true;
00236 }
00237
00238
00239 bool MQSendRecv::mqRead(RTT::base::DataSourceBase::shared_ptr ds)
00240 {
00241 int bytes = 0;
00242 if ((bytes = mq_receive(mqdes, buf, max_size, 0)) == -1)
00243 {
00244
00245 return false;
00246 }
00247 if (mtransport.updateFromBlob((void*) buf, bytes, ds, marshaller_cookie))
00248 {
00249 return true;
00250 }
00251 return false;
00252 }
00253
00254 bool MQSendRecv::mqWrite(RTT::base::DataSourceBase::shared_ptr ds)
00255 {
00256 std::pair<void const*, int> blob = mtransport.fillBlob(ds, buf, max_size, marshaller_cookie);
00257 if (blob.first == 0)
00258 {
00259 log(Error) << "MQChannel: failed to marshal sample" << endlog();
00260 return false;
00261 }
00262
00263 char* lbuf = (char*) blob.first;
00264 if (mq_send(mqdes, lbuf, blob.second, 0) == -1)
00265 {
00266 if (errno == EAGAIN)
00267 return true;
00268
00269 log(Error) << "MQChannel "<< mqdes << " became invalid (mq length="<<max_size<<", msg length="<<blob.second<<"): " << strerror(errno) << endlog();
00270 return false;
00271 }
00272 return true;
00273 }
00274