$search
00001 /*************************************************************************** 00002 tag: The SourceWorks Tue Sep 7 00:55:18 CEST 2010 MQSendRecv.cpp 00003 00004 MQSendRecv.cpp - description 00005 ------------------- 00006 begin : Tue September 07 2010 00007 copyright : (C) 2010 The SourceWorks 00008 email : peter@thesourceworks.com 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #include <fcntl.h> 00040 #include <sys/stat.h> 00041 #include <mqueue.h> 00042 #include <sys/types.h> 00043 #include <unistd.h> 00044 #include <sstream> 00045 #include <cassert> 00046 #include <stdexcept> 00047 #include <errno.h> 00048 00049 #include "MQSendRecv.hpp" 00050 #include "../../types/TypeTransporter.hpp" 00051 #include "../../types/TypeMarshaller.hpp" 00052 #include "../../Logger.hpp" 00053 #include "Dispatcher.hpp" 00054 #include "../../base/PortInterface.hpp" 00055 #include "../../DataFlowInterface.hpp" 00056 #include "../../TaskContext.hpp" 00057 00058 using namespace RTT; 00059 using namespace RTT::detail; 00060 using namespace RTT::mqueue; 00061 00062 00063 MQSendRecv::MQSendRecv(types::TypeMarshaller const& transport) : 00064 mtransport(transport), marshaller_cookie(0), buf(0), mis_sender(false), minit_done(false), max_size(0), mdata_size(0) 00065 { 00066 } 00067 00068 void MQSendRecv::setupStream(base::DataSourceBase::shared_ptr ds, base::PortInterface* port, ConnPolicy const& policy, 00069 bool is_sender) 00070 { 00071 Logger::In in("MQSendRecv"); 00072 00073 mdata_size = policy.data_size; 00074 max_size = policy.data_size ? policy.data_size : mtransport.getSampleSize(ds); 00075 marshaller_cookie = mtransport.createCookie(); 00076 mis_sender = is_sender; 00077 00078 std::stringstream namestr; 00079 namestr << '/' << port->getInterface()->getOwner()->getName() << '.' << port->getName() << '.' << this << '@' << getpid(); 00080 00081 if (policy.name_id.empty()) 00082 policy.name_id = namestr.str(); 00083 00084 struct mq_attr mattr; 00085 mattr.mq_maxmsg = policy.size ? policy.size : 10; 00086 mattr.mq_msgsize = max_size; 00087 assert( max_size ); 00088 if (policy.name_id[0] != '/') 00089 throw std::runtime_error("Could not open message queue with wrong name. Names must start with '/' and contain no more '/' after the first one."); 00090 if (max_size <= 0) 00091 throw std::runtime_error("Could not open message queue with zero message size."); 00092 int oflag = O_CREAT; 00093 if (mis_sender) 00094 oflag |= O_WRONLY | O_NONBLOCK; 00095 else 00096 oflag |= O_RDONLY; //reading is always blocking (see mqReady() ) 00097 mqdes = mq_open(policy.name_id.c_str(), oflag, S_IREAD | S_IWRITE, &mattr); 00098 00099 if (mqdes < 0) 00100 { 00101 int the_error = errno; 00102 log(Error) << "FAILED opening '" << policy.name_id << "' with message size " << mattr.mq_msgsize << ", buffer size " << mattr.mq_maxmsg << " for " 00103 << (is_sender ? "writing :" : "reading :") << endlog(); 00104 // these are copied from the man page. They are more informative than the plain perrno() text. 00105 switch (the_error) 00106 { 00107 case EACCES: 00108 log(Error) << "The queue exists, but the caller does not have permission to open it in the specified mode." << endlog(); 00109 break; 00110 case EINVAL: 00111 // or the name is wrong... 00112 log(Error) << "Wrong mqueue name given OR, In a process that is unprivileged (does not have the " 00113 << "CAP_SYS_RESOURCE capability), attr->mq_maxmsg must be less than or equal to the msg_max limit, and attr->mq_msgsize must be less than or equal to the msgsize_max limit. In addition, even in a privileged process, " 00114 << "attr->mq_maxmsg cannot exceed the HARD_MAX limit. (See mq_overview(7) for details of these limits.)" << endlog(); 00115 break; 00116 case EMFILE: 00117 log(Error) << "The process already has the maximum number of files and message queues open." << endlog(); 00118 break; 00119 case ENAMETOOLONG: 00120 log(Error) << "Name was too long." << endlog(); 00121 break; 00122 case ENFILE: 00123 log(Error) << "The system limit on the total number of open files and message queues has been reached." << endlog(); 00124 break; 00125 case ENOSPC: 00126 log(Error) 00127 << "Insufficient space for the creation of a new message queue. This probably occurred because the queues_max limit was encountered; see mq_overview(7)." 00128 << endlog(); 00129 break; 00130 case ENOMEM: 00131 log(Error) << "Insufficient memory." << endlog(); 00132 break; 00133 default: 00134 log(Error) << "Submit a bug report. An unexpected mq error occured with errno=" << errno << ": " << strerror(errno) << endlog(); 00135 } 00136 throw std::runtime_error("Could not open message queue: mq_open returned -1."); 00137 } 00138 00139 log(Debug) << "Opened '" << policy.name_id << "' with mqdes='" << mqdes << "', msg size='"<<mattr.mq_msgsize<<"' an queue length='"<<mattr.mq_maxmsg<<"' for " << (is_sender ? "writing." : "reading.") << endlog(); 00140 00141 buf = new char[max_size]; 00142 memset(buf, 0, max_size); // necessary to trick valgrind 00143 mqname = policy.name_id; 00144 } 00145 00146 MQSendRecv::~MQSendRecv() 00147 { 00148 if ( mqdes > 0) 00149 mq_close(mqdes); 00150 } 00151 00152 void MQSendRecv::cleanupStream() 00153 { 00154 if (!mis_sender) 00155 { 00156 if (minit_done) 00157 { 00158 Dispatcher::Instance()->removeQueue(mqdes); 00159 minit_done = false; 00160 } 00161 } 00162 else 00163 { 00164 // sender unlinks to avoid future re-use of new readers. 00165 mq_unlink(mqname.c_str()); 00166 } 00167 // both sender and receiver close their end. 00168 mq_close( mqdes); 00169 00170 if (marshaller_cookie) 00171 mtransport.deleteCookie(marshaller_cookie); 00172 00173 if (buf) 00174 { 00175 delete[] buf; 00176 buf = 0; 00177 } 00178 } 00179 00180 00181 void MQSendRecv::mqNewSample(RTT::base::DataSourceBase::shared_ptr ds) 00182 { 00183 // only deduce if user did not specify it explicitly: 00184 if (mdata_size == 0) 00185 max_size = mtransport.getSampleSize(ds); 00186 delete[] buf; 00187 buf = new char[max_size]; 00188 memset(buf, 0, max_size); // necessary to trick valgrind 00189 } 00190 00191 bool MQSendRecv::mqReady(base::DataSourceBase::shared_ptr ds, base::ChannelElementBase* chan) 00192 { 00193 if (minit_done) 00194 return true; 00195 00196 if (!mis_sender) 00197 { 00198 // Try to get the initial sample 00199 // 00200 // The output port implementation guarantees that there will be one 00201 // after the connection is ready 00202 struct timespec abs_timeout; 00203 clock_gettime(CLOCK_REALTIME, &abs_timeout); 00204 abs_timeout.tv_nsec += Seconds_to_nsecs(0.5); 00205 abs_timeout.tv_sec += abs_timeout.tv_nsec / (1000*1000*1000); 00206 abs_timeout.tv_nsec = abs_timeout.tv_nsec % (1000*1000*1000); 00207 //abs_timeout.tv_sec +=1; 00208 ssize_t ret = mq_timedreceive(mqdes, buf, max_size, 0, &abs_timeout); 00209 if (ret != -1) 00210 { 00211 if (mtransport.updateFromBlob((void*) buf, ret, ds, marshaller_cookie)) 00212 { 00213 minit_done = true; 00214 // ok, now we can add the dispatcher. 00215 Dispatcher::Instance()->addQueue(mqdes, chan); 00216 return true; 00217 } 00218 else 00219 { 00220 log(Error) << "Failed to initialize MQ Channel Element with initial data sample." << endlog(); 00221 return false; 00222 } 00223 } 00224 else 00225 { 00226 log(Error) << "Failed to receive initial data sample for MQ Channel Element: " << strerror(errno) << endlog(); 00227 return false; 00228 } 00229 } 00230 else 00231 { 00232 assert( !mis_sender ); // we must be receiver. we can only receive inputReady when we're on the input port side of the MQ. 00233 return false; 00234 } 00235 return true; 00236 } 00237 00238 00239 bool MQSendRecv::mqRead(RTT::base::DataSourceBase::shared_ptr ds) 00240 { 00241 int bytes = 0; 00242 if ((bytes = mq_receive(mqdes, buf, max_size, 0)) == -1) 00243 { 00244 //log(Debug) << "Tried read on empty mq!" <<endlog(); 00245 return false; 00246 } 00247 if (mtransport.updateFromBlob((void*) buf, bytes, ds, marshaller_cookie)) 00248 { 00249 return true; 00250 } 00251 return false; 00252 } 00253 00254 bool MQSendRecv::mqWrite(RTT::base::DataSourceBase::shared_ptr ds) 00255 { 00256 std::pair<void const*, int> blob = mtransport.fillBlob(ds, buf, max_size, marshaller_cookie); 00257 if (blob.first == 0) 00258 { 00259 log(Error) << "MQChannel: failed to marshal sample" << endlog(); 00260 return false; 00261 } 00262 00263 char* lbuf = (char*) blob.first; 00264 if (mq_send(mqdes, lbuf, blob.second, 0) == -1) 00265 { 00266 if (errno == EAGAIN) 00267 return true; 00268 00269 log(Error) << "MQChannel "<< mqdes << " became invalid (mq length="<<max_size<<", msg length="<<blob.second<<"): " << strerror(errno) << endlog(); 00270 return false; 00271 } 00272 return true; 00273 } 00274