$search
00001 /*************************************************************************** 00002 tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 MQChannelElement.hpp 00003 00004 MQChannelElement.hpp - description 00005 ------------------- 00006 begin : Thu October 22 2009 00007 copyright : (C) 2009 Peter Soetens 00008 email : peter@thesourcworks.com 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #ifndef MQ_CHANNEL_ELEMENT_H 00040 #define MQ_CHANNEL_ELEMENT_H 00041 00042 #include "MQSendRecv.hpp" 00043 #include "../../Logger.hpp" 00044 #include "../../base/ChannelElement.hpp" 00045 #include "../../internal/DataSource.hpp" 00046 #include "../../internal/DataSources.hpp" 00047 #include <stdexcept> 00048 00049 namespace RTT 00050 { 00051 namespace mqueue 00052 { 00061 template<typename T> 00062 class MQChannelElement: public base::ChannelElement<T>, public MQSendRecv 00063 { 00065 typename internal::ValueDataSource<T>::shared_ptr read_sample; 00067 typename internal::LateConstReferenceDataSource<T>::shared_ptr write_sample; 00068 00069 public: 00074 MQChannelElement(base::PortInterface* port, types::TypeMarshaller const& transport, 00075 const ConnPolicy& policy, bool is_sender) 00076 : MQSendRecv(transport) 00077 , read_sample(new internal::ValueDataSource<T>) 00078 , write_sample(new internal::LateConstReferenceDataSource<T>) 00079 00080 { 00081 Logger::In in("MQChannelElement"); 00082 setupStream(read_sample, port, policy, is_sender); 00083 } 00084 00085 ~MQChannelElement() { 00086 cleanupStream(); 00087 } 00088 00089 virtual bool inputReady() { 00090 if ( mqReady(read_sample, this) ) { 00091 typename base::ChannelElement<T>::shared_ptr output = 00092 this->getOutput(); 00093 assert(output); 00094 output->data_sample(read_sample->rvalue()); 00095 return true; 00096 } 00097 return false; 00098 } 00099 00100 virtual bool data_sample(typename base::ChannelElement<T>::param_t sample) 00101 { 00102 // send initial data sample to the other side using a plain write. 00103 if (mis_sender) { 00104 typename base::ChannelElement<T>::shared_ptr output = 00105 this->getOutput(); 00106 00107 write_sample->setPointer(&sample); 00108 // update MQSendRecv buffer: 00109 mqNewSample(write_sample); 00110 return mqWrite(write_sample); 00111 } 00112 return false; 00113 } 00114 00132 bool signal() 00133 { 00134 // copy messages into channel 00135 if (mis_sender) { 00136 // this read should always succeed since signal() means 00137 // 'data available in a data element'. 00138 typename base::ChannelElement<T>::shared_ptr input = 00139 this->getInput(); 00140 if( input && input->read(read_sample->set(), false) == NewData ) 00141 return this->write(read_sample->rvalue()); 00142 } else { 00143 typename base::ChannelElement<T>::shared_ptr output = 00144 this->getOutput(); 00145 if (output && mqRead(read_sample)) 00146 return output->write(read_sample->rvalue()); 00147 } 00148 return false; 00149 } 00150 00156 FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data) 00157 { 00158 throw std::runtime_error("not implemented"); 00159 } 00160 00166 bool write(typename base::ChannelElement<T>::param_t sample) 00167 { 00168 write_sample->setPointer(&sample); 00169 return mqWrite(write_sample); 00170 } 00171 00172 }; 00173 } 00174 } 00175 00176 #endif 00177