00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef ORO_INPUT_PORT_HPP
00040 #define ORO_INPUT_PORT_HPP
00041
00042 #include "base/InputPortInterface.hpp"
00043 #include "internal/Channels.hpp"
00044 #include "internal/InputPortSource.hpp"
00045 #include "Service.hpp"
00046 #include "OperationCaller.hpp"
00047
00048 #include "OutputPort.hpp"
00049
00050 namespace RTT
00051 {
00062 template<typename T>
00063 class InputPort : public base::InputPortInterface
00064 {
00065 friend class internal::ConnOutputEndpoint<T>;
00066 typename internal::InputPortSource<T>::shared_ptr data_source;
00067
00068 virtual bool connectionAdded( base::ChannelElementBase::shared_ptr channel_input, ConnPolicy const& policy ) { return true; }
00069
00070 bool do_read(typename base::ChannelElement<T>::reference_t sample, FlowStatus& result, bool copy_old_data, const internal::ConnectionManager::ChannelDescriptor& descriptor)
00071 {
00072 typename base::ChannelElement<T>::shared_ptr input = static_cast< base::ChannelElement<T>* >( descriptor.get<1>().get() );
00073 assert( result != NewData );
00074 if ( input ) {
00075 FlowStatus tresult = input->read(sample, copy_old_data);
00076
00077 if (tresult == NewData) {
00078 result = tresult;
00079 return true;
00080 }
00081
00082 if (tresult > result)
00083 result = tresult;
00084 }
00085 return false;
00086 }
00087
00094 InputPort(InputPort const& orig);
00095 InputPort& operator=(InputPort const& orig);
00096 public:
00097 InputPort(std::string const& name = "unnamed", ConnPolicy const& default_policy = ConnPolicy())
00098 : base::InputPortInterface(name, default_policy)
00099 , data_source(0) {}
00100
00101 virtual ~InputPort() { disconnect(); if (data_source) data_source->dropPort(); }
00102
00104 FlowStatus read(base::DataSourceBase::shared_ptr source)
00105 { return read(source, true); }
00106
00107 FlowStatus read(base::DataSourceBase::shared_ptr source, bool copy_old_data)
00108 {
00109 typename internal::AssignableDataSource<T>::shared_ptr ds =
00110 boost::dynamic_pointer_cast< internal::AssignableDataSource<T> >(source);
00111 if (! ds)
00112 {
00113 log(Error) << "trying to read to an incompatible data source" << endlog();
00114 return NoData;
00115 }
00116 return read(ds->set(), copy_old_data);
00117 }
00118
00125 FlowStatus readNewest(base::DataSourceBase::shared_ptr source, bool copy_old_data = true)
00126 {
00127 typename internal::AssignableDataSource<T>::shared_ptr ds =
00128 boost::dynamic_pointer_cast< internal::AssignableDataSource<T> >(source);
00129 if (! ds)
00130 {
00131 log(Error) << "trying to read to an incompatible data source" << endlog();
00132 return NoData;
00133 }
00134 return readNewest(ds->set(), copy_old_data);
00135 }
00136
00138 FlowStatus read(typename base::ChannelElement<T>::reference_t sample)
00139 { return read(sample, true); }
00140
00152 FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
00153 {
00154 FlowStatus result = NoData;
00155
00156 cmanager.select_reader_channel( boost::bind( &InputPort::do_read, this, boost::ref(sample), boost::ref(result), copy_old_data, boost::lambda::_1) );
00157 return result;
00158 }
00159
00160
00167 FlowStatus readNewest(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data = true)
00168 {
00169 FlowStatus result = read(sample, copy_old_data);
00170 if (result != RTT::NewData)
00171 return result;
00172
00173 while (read(sample, false) == RTT::NewData);
00174 return RTT::NewData;
00175 }
00176
00178 virtual const types::TypeInfo* getTypeInfo() const
00179 { return internal::DataSourceTypeInfo<T>::getTypeInfo(); }
00180
00184 virtual base::PortInterface* clone() const
00185 { return new InputPort<T>(this->getName()); }
00186
00192 virtual base::PortInterface* antiClone() const
00193 { return new OutputPort<T>(this->getName()); }
00194
00199 base::DataSourceBase* getDataSource()
00200 {
00201 if (data_source) return data_source.get();
00202 data_source = new internal::InputPortSource<T>(*this);
00203 return data_source.get();
00204 }
00205
00206 virtual bool createStream(ConnPolicy const& policy)
00207 {
00208 return internal::ConnFactory::createStream(*this, policy);
00209 }
00210
00215 virtual Service* createPortObject()
00216 {
00217 Service* object = base::InputPortInterface::createPortObject();
00218
00219 typedef FlowStatus (InputPort<T>::*ReadSample)(typename base::ChannelElement<T>::reference_t);
00220 ReadSample read_m = &InputPort<T>::read;
00221 object->addSynchronousOperation("read", read_m, this).doc("Reads a sample from the port.").arg("sample", "");
00222 return object;
00223 }
00224 };
00225 }
00226
00227 #endif
00228