00001 /* 00002 * mq_channel_element.hpp - micros message queue element for transport channel 00003 * Copyright (C) 2015 Zaile Jiang 00004 * 00005 * This program is free software; you can redistribute it and/or 00006 * modify it under the terms of the GNU General Public License 00007 * as published by the Free Software Foundation; either version 2 00008 * of the License, or (at your option) any later version. 00009 * 00010 * This program is distributed in the hope that it will be useful, 00011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00013 * GNU General Public License for more details. 00014 * 00015 * You should have received a copy of the GNU General Public License 00016 * along with this program; if not, write to the Free Software 00017 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 00018 */ 00019 #ifndef MICROSRTT_MQ_CHANNEL_ELEMENT_HPP 00020 #define MICROSRTT_MQ_CHANNEL_ELEMENT_HPP 00021 00022 #include "ros/serialization.h" 00023 #include "micros_rtt/oro/mq_send_recv.hpp" 00024 #include "micros_rtt/oro/channel_element.hpp" 00025 #include "micros_rtt/oro/data_lockfree.hpp" 00026 #include <stdexcept> 00027 00028 namespace micros_rtt 00029 { 00030 using namespace ros::serialization; 00039 template<typename M> 00040 class MQChannelElement: public ChannelElement<M>, public MQSendRecv 00041 { 00043 typename DataObjectLockFree<M>::shared_ptr read_sample; 00045 typename DataObjectLockFree<M>::shared_ptr write_sample; 00046 00047 00048 SerializedMessage read_m; 00049 00050 public: 00055 MQChannelElement(ConnectionBasePtr connection, bool is_sender) 00056 : MQSendRecv(), 00057 read_sample(new DataObjectLockFree<M>(M())) 00058 , write_sample(new DataObjectLockFree<M>(M())) 00059 00060 { 00061 setupStream(connection, sizeof(M), is_sender); 00062 } 00063 00064 ~MQChannelElement() 00065 { 00066 cleanupStream(); 00067 } 00068 00069 virtual bool inputReady() 00070 { 00071 if ( mqReady(this) ) 00072 { 00073 // typename ChannelElement<M>::shared_ptr output = this->getOutput(); 00074 // assert(output); 00075 // output->data_sample(read_sample->Get()); 00076 return true; 00077 } 00078 return false; 00079 } 00080 00081 virtual bool data_sample(typename ChannelElement<M>::param_t sample) 00082 { 00083 // send initial data sample to the other side using a plain write. 00084 if (mis_sender) 00085 { 00086 typename ChannelElement<M>::shared_ptr output = 00087 this->getOutput(); 00088 00089 write_sample->data_sample(sample); 00090 // update MQSendRecv buffer: 00091 // mqNewSample(sizeof(sample)); 00092 // return mqWrite(sample); 00093 00094 return true; 00095 } 00096 return false; 00097 } 00098 00116 // bool signal() 00117 // { 00118 // // copy messages into channel 00119 // if (mis_sender) 00120 // { 00121 // // this read should always succeed since signal() means 00122 // // 'data available in a data element'. 00123 // typename ChannelElement<T>::shared_ptr input = 00124 // this->getInput(); 00125 // if( input && input->read(read_sample->set(), false) == NewData ) 00126 // return this->write(read_sample->rvalue()); 00127 // } 00128 // else 00129 // { 00130 // typename ChannelElement<T>::shared_ptr output = 00131 // this->getOutput(); 00132 // if (output && mqRead()) 00133 // return output->write(read_sample->rvalue()); 00134 // ; 00135 // } 00136 // return false; 00137 // } 00138 00144 FlowStatus read(typename ChannelElement<M>::reference_t sample, bool copy_old_data) 00145 { 00146 ROS_DEBUG("micros message queue read."); 00147 //messages got from message queue need to be deserialize. 00148 00149 if (mqRead(read_m)) 00150 { 00151 deserializeMessage<M>(read_m, sample); 00152 return NewData; 00153 } 00154 00155 return NoData; 00156 } 00157 00163 bool write(typename ChannelElement<M>::param_t sample) 00164 { 00165 //messages sent through message queue need to be serialized first. 00166 SerializedMessage m = serializeMessage<M>(sample); 00167 return mqWrite(m); 00168 } 00169 00170 }; 00171 00172 } 00173 00174 #endif 00175