mq_send_recv.cpp
Go to the documentation of this file.
00001 /* 
00002  *  mq_channel_element.cpp - micros message queue transport methods
00003  *  Copyright (C) 2015 Zaile Jiang
00004  *  
00005  *  This program is free software; you can redistribute it and/or
00006  *  modify it under the terms of the GNU General Public License
00007  *  as published by the Free Software Foundation; either version 2
00008  *  of the License, or (at your option) any later version.
00009  *  
00010  *  This program is distributed in the hope that it will be useful,
00011  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  *  GNU General Public License for more details.
00014  *  
00015  *  You should have received a copy of the GNU General Public License
00016  *  along with this program; if not, write to the Free Software
00017  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
00018 */
00019 
00020 #include <fcntl.h>
00021 #include <sys/stat.h>
00022 #include <mqueue.h>
00023 #include <sys/types.h>
00024 #include <unistd.h>
00025 #include <sstream>
00026 #include <cassert>
00027 #include <stdexcept>
00028 #include <errno.h>
00029 
00030 #include "micros_rtt/common.h"
00031 #include "micros_rtt/oro/mq_send_recv.hpp"
00032 
00033 
00034 namespace micros_rtt
00035 {
00036 
00037 
00038 MQSendRecv::MQSendRecv() : buf(0), 
00039   mis_sender(false), minit_done(false), max_size(0), mdata_size(0)
00040 {
00041 }
00042 
00043 void MQSendRecv::setupStream(ConnectionBasePtr connection, int data_size,
00044                              bool is_sender)
00045 {
00046   mdata_size = data_size;
00047   max_size = data_size * 10;
00048   mis_sender = is_sender;
00049 
00050   std::stringstream namestr;
00051   namestr << '/' << connection->getTopic();
00052 
00053   struct mq_attr mattr;
00054   mattr.mq_maxmsg = data_size;
00055   mattr.mq_msgsize = max_size;
00056   assert( max_size );
00057   if (max_size <= 0)
00058     throw std::runtime_error("Could not open message queue with zero message size.");
00059 
00060 
00061   // set message queue flag and create stream
00062   int oflag = O_CREAT;
00063   if (mis_sender)
00064       oflag |= O_WRONLY | O_NONBLOCK;
00065   else
00066       oflag |= O_RDONLY | O_NONBLOCK;
00067   mqdes = mq_open(namestr.str().c_str(), oflag, S_IREAD | S_IWRITE, &mattr);
00068 
00069   if (mqdes < 0)
00070   {
00071     int the_error = errno;
00072     ROS_WARN("micros FAILED opening message queue %s with message size %d, buffer size %d.", namestr.str().c_str(), (int)mattr.mq_msgsize, (int)mattr.mq_maxmsg);
00073     // these are copied from the man page. They are more informative than the plain perrno() text.
00074     switch (the_error)
00075     {
00076     case EACCES:
00077       ROS_WARN("The queue exists, but the caller does not have permission to open it in the specified mode.");
00078       break;
00079     case EINVAL:
00080         // or the name is wrong...
00081       ROS_WARN("Wrong mqueue name given OR, In a process  that  is  unprivileged (does  not  have  the CAP_SYS_RESOURCE  capability),  attr->mq_maxmsg must be less than or equal to the msg_max limit, and attr->mq_msgsize must be less than or equal to the msgsize_max limit.  In addition, even in a privileged process, attr->mq_maxmsg cannot exceed the HARD_MAX limit. (See mq_overview(7) for details of these limits.");
00082       break;
00083     case EMFILE:
00084       ROS_WARN("The process already has the maximum number of files and message queues open.");
00085       break;
00086     case ENAMETOOLONG:
00087       ROS_WARN("Name was too long.");
00088       break;
00089     case ENFILE:
00090       ROS_WARN("The system limit on the total number of open files and message queues has been reached.");
00091       break;
00092     case ENOSPC:
00093       ROS_WARN( "Insufficient space for the creation of a new message queue. This probably occurred because the queues_max limit was encountered; see mq_overview(7).");
00094       break;
00095     case ENOMEM:
00096       ROS_WARN("Insufficient memory.");
00097       break;
00098     default:
00099       ROS_WARN("Submit a bug report. An unexpected mq error occured with errno= %x : %s", errno, strerror(errno));
00100     }
00101     throw std::runtime_error("Could not open message queue: mq_open returned -1.");
00102   }
00103 
00104   buf = new char[max_size];
00105   memset(buf, 0, max_size); // necessary to trick valgrind
00106   mqname = namestr.str();
00107   
00108   ROS_DEBUG("micros open Mqueue with name %s", mqname.c_str());
00109 }
00110 
00111 MQSendRecv::~MQSendRecv()
00112 {
00113   if (mqdes > 0)
00114     mq_close(mqdes);
00115 }
00116 
00117 
00118 void MQSendRecv::cleanupStream()
00119 {
00120   ROS_DEBUG("cleanupStream");
00121   if (!mis_sender)
00122   {
00123     if (minit_done)
00124     {
00125       minit_done = false;
00126     }
00127   }
00128   else
00129   {
00130     // sender unlinks to avoid future re-use of new readers.
00131     mq_unlink(mqname.c_str());
00132   }
00133   // both sender and receiver close their end.
00134   mq_close( mqdes);
00135 
00136   if (buf)
00137   {
00138     delete[] buf;
00139     buf = 0;
00140   }
00141 }
00142 
00143 
00144 void MQSendRecv::mqNewSample(int size)
00145 {
00146     // only deduce if user did not specify it explicitly:
00147     if (mdata_size == 0)
00148       max_size = size;
00149     delete[] buf;
00150     buf = new char[max_size];
00151     memset(buf, 0, max_size); // necessary to trick valgrind
00152 }
00153 
00154 bool MQSendRecv::mqReady(ChannelElementBase* chan)
00155 {
00156   if (minit_done)
00157     return true;
00158 
00159   if (!mis_sender)
00160   {
00161     // Try to get the initial sample
00162     // The output port implementation guarantees that there will be one
00163     // after the connection is ready
00164     ROS_DEBUG("micros message queue ready.");
00165     return true;
00166   }
00167   else
00168   {
00169     assert( !mis_sender ); // we must be receiver. we can only receive inputReady when we're on the input port side of the MQ.
00170     return false;
00171   }
00172 }
00173 
00174 bool MQSendRecv::mqRead(SerializedMessage& m)
00175 {
00176   int bytes = 0;
00177   if ((bytes = mq_receive(mqdes, buf, max_size, 0)) == -1)
00178   {
00179     //ROS_WARN("micros message queue tried to read on empty mq!");
00180     return false;
00181   }
00182   else
00183   {
00184     ROS_DEBUG("micros message queue received %d bytes.", bytes);
00185     if (!m.buf)
00186       m.buf.reset((unsigned char *)buf);
00187     m.num_bytes = bytes;
00188     m.message_start = ((unsigned char *)buf + 4);
00189     return true;
00190   }
00191 }
00192 
00193 bool MQSendRecv::mqWrite(SerializedMessage& m)
00194 {
00195   if (!m.buf.get())
00196     ROS_WARN("micros message queue write null buf");
00197   if (mq_send(mqdes, (char *)m.buf.get(), (uint32_t)m.num_bytes, 0) == -1)
00198   {
00199     ROS_WARN("micros message queue send error number:%d.", errno);
00200     if (errno == EAGAIN)
00201                 {
00202                         ROS_WARN("EAGAIN, message queue full.");
00203       return true;
00204     }
00205     return false;
00206   }
00207   ROS_DEBUG("micros message queue write successfully.");
00208   return true;
00209 }
00210 
00211 }


micros_rtt
Author(s): Zaile Jiang , Xiaodong Yi , Minglong Li
autogenerated on Sat Jun 8 2019 19:02:21