00001 /* 00002 * subscription.hpp - micros sub connection 00003 * Copyright (C) 2015 Zaile Jiang 00004 * 00005 * This program is free software; you can redistribute it and/or 00006 * modify it under the terms of the GNU General Public License 00007 * as published by the Free Software Foundation; either version 2 00008 * of the License, or (at your option) any later version. 00009 * 00010 * This program is distributed in the hope that it will be useful, 00011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00013 * GNU General Public License for more details. 00014 * 00015 * You should have received a copy of the GNU General Public License 00016 * along with this program; if not, write to the Free Software 00017 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 00018 */ 00019 #ifndef MICROSRTT_SUBSCRIPTION_HPP 00020 #define MICROSRTT_SUBSCRIPTION_HPP 00021 00022 #include "ros/ros.h" 00023 #include "boost/function.hpp" 00024 #include "micros_rtt/connection_base.h" 00025 #include "micros_rtt/oro/channel_data_element.hpp" 00026 00027 namespace micros_rtt 00028 { 00029 00030 template<class M> 00031 class Subscription : public ConnectionBase 00032 { 00033 public: 00034 //typedef boost::intrusive_ptr< Subscription<M> > shared_ptr; 00035 Subscription(const std::string& topic) : ConnectionBase(topic) {}; 00036 Subscription(const std::string& topic, boost::function<void(M)> fp) : ConnectionBase(topic) 00037 { 00038 callback = fp; 00039 } 00040 ~Subscription() {}; 00041 00042 void setCallback(boost::function<void(M)> fp) 00043 { 00044 callback = fp; 00045 } 00046 00047 bool channelReady( ChannelElementBase::shared_ptr channel) 00048 { 00049 if (channel && channel->inputReady()) 00050 { 00051 addConnection(channel); 00052 return true; 00053 } 00054 return false; 00055 } 00056 00057 bool mqChannelReady( ChannelElementBase::shared_ptr channel) 00058 { 00059 if (channel && channel->inputReady()) 00060 { 00061 addMQConnection(channel); 00062 return true; 00063 } 00064 return false; 00065 } 00066 00067 bool call() 00068 { 00069 FlowStatus result; 00070 M sample; 00071 M mq_sample; 00072 typename ChannelElement<M>::shared_ptr input = static_cast< ChannelElement<M>* >( this->getChannelElement().get() ); 00073 typename ChannelElement<M>::shared_ptr mq_input = static_cast< ChannelElement<M>* >( this->getMQChannelElement().get() ); 00074 00075 if (!(mq_input || input)) 00076 { 00077 return false; 00078 } 00079 //if ( input ) 00080 //{ 00081 // FlowStatus tresult = input->read(sample, false); 00082 // // the result trickery is for not overwriting OldData with NoData. 00083 // if (tresult == NewData) 00084 // { 00085 // result = tresult; 00086 // callback(sample); 00087 // } 00088 // // stores OldData result 00089 // if (tresult > result) 00090 // result = tresult; 00091 //} 00092 00093 if ( mq_input ) 00094 { 00095 FlowStatus tresult = mq_input->read(mq_sample, false); 00096 // the result trickery is for not overwriting OldData with NoData. 00097 if (tresult == NewData) 00098 { 00099 result = tresult; 00100 callback(mq_sample); 00101 } 00102 // stores OldData result 00103 if (tresult > result) 00104 result = tresult; 00105 } 00106 return result; 00107 00108 } 00109 private: 00110 boost::function<void(M)> callback; 00111 }; 00112 00113 00114 } 00115 00116 #endif