00001 #ifndef ROSRTT_SUBSCRIPTION_H 00002 #define ROSRTT_SUBSCRIPTION_H 00003 00004 #include "oro/channel_data_element.hpp" 00005 #include "boost/function.hpp" 00006 #include "connection_base.h" 00007 00008 namespace hpcl_rtt 00009 { 00010 00011 template<class M> 00012 class Subscription : public ConnectionBase 00013 { 00014 public: 00015 //typedef boost::intrusive_ptr< Subscription<M> > shared_ptr; 00016 Subscription(const std::string& topic) : ConnectionBase(topic) {}; 00017 Subscription(const std::string& topic, boost::function<void(M)> fp) : ConnectionBase(topic) 00018 { 00019 callback = fp; 00020 } 00021 ~Subscription() {}; 00022 00023 void setCallback(boost::function<void(M)> fp) 00024 { 00025 callback = fp; 00026 } 00027 00028 bool channelReady( ChannelElementBase::shared_ptr end_port) 00029 { 00030 if (end_port->inputReady ()) 00031 { 00032 return true; 00033 } 00034 return false; 00035 } 00036 00037 bool call() 00038 { 00039 FlowStatus result; 00040 M sample; 00041 typename ChannelElement<M>::shared_ptr input = static_cast< ChannelElement<M>* >( this->getChannelElement().get() ); 00042 if ( input ) 00043 { 00044 FlowStatus tresult = input->read(sample, false); 00045 // the result trickery is for not overwriting OldData with NoData. 00046 if (tresult == NewData) 00047 { 00048 result = tresult; 00049 callback(sample); 00050 return true; 00051 } 00052 // stores OldData result 00053 if (tresult > result) 00054 result = tresult; 00055 } 00056 return false; 00057 } 00058 private: 00059 boost::function<void(M)> callback; 00060 }; 00061 00062 00063 } 00064 00065 #endif