00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef MQ_CHANNEL_ELEMENT_H
00040 #define MQ_CHANNEL_ELEMENT_H
00041
00042 #include "MQSendRecv.hpp"
00043 #include "../../Logger.hpp"
00044 #include "../../base/ChannelElement.hpp"
00045 #include "../../internal/DataSource.hpp"
00046 #include "../../internal/DataSources.hpp"
00047 #include <stdexcept>
00048
00049 namespace RTT
00050 {
00051 namespace mqueue
00052 {
00061 template<typename T>
00062 class MQChannelElement: public base::ChannelElement<T>, public MQSendRecv
00063 {
00065 typename internal::ValueDataSource<T>::shared_ptr read_sample;
00067 typename internal::LateConstReferenceDataSource<T>::shared_ptr write_sample;
00068
00069 public:
00074 MQChannelElement(base::PortInterface* port, types::TypeMarshaller const& transport,
00075 const ConnPolicy& policy, bool is_sender)
00076 : MQSendRecv(transport)
00077 , read_sample(new internal::ValueDataSource<T>)
00078 , write_sample(new internal::LateConstReferenceDataSource<T>)
00079
00080 {
00081 Logger::In in("MQChannelElement");
00082 setupStream(read_sample, port, policy, is_sender);
00083 }
00084
00085 ~MQChannelElement() {
00086 cleanupStream();
00087 }
00088
00089 virtual bool inputReady() {
00090 if ( mqReady(read_sample, this) ) {
00091 typename base::ChannelElement<T>::shared_ptr output =
00092 this->getOutput();
00093 assert(output);
00094 output->data_sample(read_sample->rvalue());
00095 return true;
00096 }
00097 return false;
00098 }
00099
00100 virtual bool data_sample(typename base::ChannelElement<T>::param_t sample)
00101 {
00102
00103 if (mis_sender) {
00104 typename base::ChannelElement<T>::shared_ptr output =
00105 this->getOutput();
00106
00107 write_sample->setPointer(&sample);
00108
00109 mqNewSample(write_sample);
00110 return mqWrite(write_sample);
00111 }
00112 return false;
00113 }
00114
00132 bool signal()
00133 {
00134
00135 if (mis_sender) {
00136
00137
00138 typename base::ChannelElement<T>::shared_ptr input =
00139 this->getInput();
00140 if( input && input->read(read_sample->set(), false) == NewData )
00141 return this->write(read_sample->rvalue());
00142 } else {
00143 typename base::ChannelElement<T>::shared_ptr output =
00144 this->getOutput();
00145 if (output && mqRead(read_sample))
00146 return output->write(read_sample->rvalue());
00147 }
00148 return false;
00149 }
00150
00156 FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
00157 {
00158 throw std::runtime_error("not implemented");
00159 }
00160
00166 bool write(typename base::ChannelElement<T>::param_t sample)
00167 {
00168 write_sample->setPointer(&sample);
00169 return mqWrite(write_sample);
00170 }
00171
00172 };
00173 }
00174 }
00175
00176 #endif
00177