Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <fcntl.h>
00021 #include <sys/stat.h>
00022 #include <mqueue.h>
00023 #include <sys/types.h>
00024 #include <unistd.h>
00025 #include <sstream>
00026 #include <cassert>
00027 #include <stdexcept>
00028 #include <errno.h>
00029
00030 #include "micros_rtt/common.h"
00031 #include "micros_rtt/oro/mq_send_recv.hpp"
00032
00033
00034 namespace micros_rtt
00035 {
00036
00037
00038 MQSendRecv::MQSendRecv() : buf(0),
00039 mis_sender(false), minit_done(false), max_size(0), mdata_size(0)
00040 {
00041 }
00042
00043 void MQSendRecv::setupStream(ConnectionBasePtr connection, int data_size,
00044 bool is_sender)
00045 {
00046 mdata_size = data_size;
00047 max_size = data_size * 10;
00048 mis_sender = is_sender;
00049
00050 std::stringstream namestr;
00051 namestr << '/' << connection->getTopic();
00052
00053 struct mq_attr mattr;
00054 mattr.mq_maxmsg = data_size;
00055 mattr.mq_msgsize = max_size;
00056 assert( max_size );
00057 if (max_size <= 0)
00058 throw std::runtime_error("Could not open message queue with zero message size.");
00059
00060
00061
00062 int oflag = O_CREAT;
00063 if (mis_sender)
00064 oflag |= O_WRONLY | O_NONBLOCK;
00065 else
00066 oflag |= O_RDONLY | O_NONBLOCK;
00067 mqdes = mq_open(namestr.str().c_str(), oflag, S_IREAD | S_IWRITE, &mattr);
00068
00069 if (mqdes < 0)
00070 {
00071 int the_error = errno;
00072 ROS_WARN("micros FAILED opening message queue %s with message size %d, buffer size %d.", namestr.str().c_str(), (int)mattr.mq_msgsize, (int)mattr.mq_maxmsg);
00073
00074 switch (the_error)
00075 {
00076 case EACCES:
00077 ROS_WARN("The queue exists, but the caller does not have permission to open it in the specified mode.");
00078 break;
00079 case EINVAL:
00080
00081 ROS_WARN("Wrong mqueue name given OR, In a process that is unprivileged (does not have the CAP_SYS_RESOURCE capability), attr->mq_maxmsg must be less than or equal to the msg_max limit, and attr->mq_msgsize must be less than or equal to the msgsize_max limit. In addition, even in a privileged process, attr->mq_maxmsg cannot exceed the HARD_MAX limit. (See mq_overview(7) for details of these limits.");
00082 break;
00083 case EMFILE:
00084 ROS_WARN("The process already has the maximum number of files and message queues open.");
00085 break;
00086 case ENAMETOOLONG:
00087 ROS_WARN("Name was too long.");
00088 break;
00089 case ENFILE:
00090 ROS_WARN("The system limit on the total number of open files and message queues has been reached.");
00091 break;
00092 case ENOSPC:
00093 ROS_WARN( "Insufficient space for the creation of a new message queue. This probably occurred because the queues_max limit was encountered; see mq_overview(7).");
00094 break;
00095 case ENOMEM:
00096 ROS_WARN("Insufficient memory.");
00097 break;
00098 default:
00099 ROS_WARN("Submit a bug report. An unexpected mq error occured with errno= %x : %s", errno, strerror(errno));
00100 }
00101 throw std::runtime_error("Could not open message queue: mq_open returned -1.");
00102 }
00103
00104 buf = new char[max_size];
00105 memset(buf, 0, max_size);
00106 mqname = namestr.str();
00107
00108 ROS_DEBUG("micros open Mqueue with name %s", mqname.c_str());
00109 }
00110
00111 MQSendRecv::~MQSendRecv()
00112 {
00113 if (mqdes > 0)
00114 mq_close(mqdes);
00115 }
00116
00117
00118 void MQSendRecv::cleanupStream()
00119 {
00120 ROS_DEBUG("cleanupStream");
00121 if (!mis_sender)
00122 {
00123 if (minit_done)
00124 {
00125 minit_done = false;
00126 }
00127 }
00128 else
00129 {
00130
00131 mq_unlink(mqname.c_str());
00132 }
00133
00134 mq_close( mqdes);
00135
00136 if (buf)
00137 {
00138 delete[] buf;
00139 buf = 0;
00140 }
00141 }
00142
00143
00144 void MQSendRecv::mqNewSample(int size)
00145 {
00146
00147 if (mdata_size == 0)
00148 max_size = size;
00149 delete[] buf;
00150 buf = new char[max_size];
00151 memset(buf, 0, max_size);
00152 }
00153
00154 bool MQSendRecv::mqReady(ChannelElementBase* chan)
00155 {
00156 if (minit_done)
00157 return true;
00158
00159 if (!mis_sender)
00160 {
00161
00162
00163
00164 ROS_DEBUG("micros message queue ready.");
00165 return true;
00166 }
00167 else
00168 {
00169 assert( !mis_sender );
00170 return false;
00171 }
00172 }
00173
00174 bool MQSendRecv::mqRead(SerializedMessage& m)
00175 {
00176 int bytes = 0;
00177 if ((bytes = mq_receive(mqdes, buf, max_size, 0)) == -1)
00178 {
00179
00180 return false;
00181 }
00182 else
00183 {
00184 ROS_DEBUG("micros message queue received %d bytes.", bytes);
00185 if (!m.buf)
00186 m.buf.reset((unsigned char *)buf);
00187 m.num_bytes = bytes;
00188 m.message_start = ((unsigned char *)buf + 4);
00189 return true;
00190 }
00191 }
00192
00193 bool MQSendRecv::mqWrite(SerializedMessage& m)
00194 {
00195 if (!m.buf.get())
00196 ROS_WARN("micros message queue write null buf");
00197 if (mq_send(mqdes, (char *)m.buf.get(), (uint32_t)m.num_bytes, 0) == -1)
00198 {
00199 ROS_WARN("micros message queue send error number:%d.", errno);
00200 if (errno == EAGAIN)
00201 {
00202 ROS_WARN("EAGAIN, message queue full.");
00203 return true;
00204 }
00205 return false;
00206 }
00207 ROS_DEBUG("micros message queue write successfully.");
00208 return true;
00209 }
00210
00211 }