mq_channel_element.hpp
Go to the documentation of this file.
00001 /* 
00002  *  mq_channel_element.hpp - micros message queue element for transport channel
00003  *  Copyright (C) 2015 Zaile Jiang
00004  *  
00005  *  This program is free software; you can redistribute it and/or
00006  *  modify it under the terms of the GNU General Public License
00007  *  as published by the Free Software Foundation; either version 2
00008  *  of the License, or (at your option) any later version.
00009  *  
00010  *  This program is distributed in the hope that it will be useful,
00011  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  *  GNU General Public License for more details.
00014  *  
00015  *  You should have received a copy of the GNU General Public License
00016  *  along with this program; if not, write to the Free Software
00017  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
00018 */
00019 #ifndef MICROSRTT_MQ_CHANNEL_ELEMENT_HPP
00020 #define MICROSRTT_MQ_CHANNEL_ELEMENT_HPP
00021 
00022 #include "ros/serialization.h"
00023 #include "micros_rtt/oro/mq_send_recv.hpp"
00024 #include "micros_rtt/oro/channel_element.hpp"
00025 #include "micros_rtt/oro/data_lockfree.hpp"
00026 #include <stdexcept>
00027 
00028 namespace micros_rtt
00029 {
00030 using namespace ros::serialization;
00039 template<typename M>
00040 class MQChannelElement: public ChannelElement<M>, public MQSendRecv
00041 {
00043   typename DataObjectLockFree<M>::shared_ptr read_sample;
00045   typename DataObjectLockFree<M>::shared_ptr write_sample;
00046 
00047   
00048   SerializedMessage read_m;   
00049 
00050 public:
00055   MQChannelElement(ConnectionBasePtr connection, bool is_sender)
00056       : MQSendRecv(), 
00057         read_sample(new DataObjectLockFree<M>(M()))
00058         , write_sample(new DataObjectLockFree<M>(M()))
00059 
00060   {
00061     setupStream(connection, sizeof(M), is_sender);
00062   }
00063 
00064   ~MQChannelElement() 
00065   {
00066     cleanupStream();
00067   }
00068 
00069   virtual bool inputReady() 
00070   {
00071     if ( mqReady(this) ) 
00072     {
00073 //      typename ChannelElement<M>::shared_ptr output = this->getOutput();
00074 //      assert(output);
00075 //      output->data_sample(read_sample->Get());
00076       return true;
00077     }
00078     return false;
00079   }
00080 
00081   virtual bool data_sample(typename ChannelElement<M>::param_t sample)
00082   {
00083     // send initial data sample to the other side using a plain write.
00084     if (mis_sender) 
00085     {
00086       typename ChannelElement<M>::shared_ptr output =
00087           this->getOutput();
00088  
00089       write_sample->data_sample(sample);
00090       // update MQSendRecv buffer:
00091       // mqNewSample(sizeof(sample));
00092       // return mqWrite(sample);
00093       
00094       return true;
00095     }
00096     return false;
00097   }
00098 
00116 //  bool signal()
00117 //  {
00118 //    // copy messages into channel
00119 //    if (mis_sender) 
00120 //    {
00121 //      // this read should always succeed since signal() means
00122 //      // 'data available in a data element'.
00123 //      typename ChannelElement<T>::shared_ptr input =
00124 //          this->getInput();
00125 //      if( input && input->read(read_sample->set(), false) == NewData )
00126 //        return this->write(read_sample->rvalue());
00127 //    } 
00128 //    else 
00129 //    {
00130 //      typename ChannelElement<T>::shared_ptr output =
00131 //          this->getOutput();
00132 //      if (output && mqRead())
00133 //        return output->write(read_sample->rvalue());
00134 //      ;
00135 //    }
00136 //    return false;
00137 //  }
00138 
00144   FlowStatus read(typename ChannelElement<M>::reference_t sample, bool copy_old_data)
00145   {
00146     ROS_DEBUG("micros message queue read.");
00147     //messages got from message queue need to be deserialize.
00148         
00149     if (mqRead(read_m))
00150     {
00151       deserializeMessage<M>(read_m, sample);
00152       return NewData;
00153     }
00154     
00155     return NoData;
00156   }
00157 
00163   bool write(typename ChannelElement<M>::param_t sample)
00164   {
00165     //messages sent through message queue need to be serialized first.
00166     SerializedMessage m = serializeMessage<M>(sample);
00167     return mqWrite(m);
00168   }
00169 
00170 };
00171 
00172 }
00173 
00174 #endif
00175 


micros_rtt
Author(s): Zaile Jiang , Xiaodong Yi , Minglong Li
autogenerated on Sat Jun 8 2019 19:02:21