Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef ORO_INPUT_PORT_HPP
00040 #define ORO_INPUT_PORT_HPP
00041
00042 #include "base/InputPortInterface.hpp"
00043 #include "internal/Channels.hpp"
00044 #include "internal/InputPortSource.hpp"
00045 #include "Service.hpp"
00046 #include "OperationCaller.hpp"
00047
00048 #include "OutputPort.hpp"
00049
00050 namespace RTT
00051 {
00062 template<typename T>
00063 class InputPort : public base::InputPortInterface
00064 {
00065 friend class internal::ConnOutputEndpoint<T>;
00066
00067 virtual bool connectionAdded( base::ChannelElementBase::shared_ptr channel_input, ConnPolicy const& policy ) { return true; }
00068
00069 bool do_read(typename base::ChannelElement<T>::reference_t sample, FlowStatus& result, bool copy_old_data, const internal::ConnectionManager::ChannelDescriptor& descriptor)
00070 {
00071 typename base::ChannelElement<T>::shared_ptr input = static_cast< base::ChannelElement<T>* >( descriptor.get<1>().get() );
00072 assert( result != NewData );
00073 if ( input ) {
00074 FlowStatus tresult = input->read(sample, copy_old_data);
00075
00076 if (tresult == NewData) {
00077 result = tresult;
00078 return true;
00079 }
00080
00081 if (tresult > result)
00082 result = tresult;
00083 }
00084 return false;
00085 }
00086
00093 InputPort(InputPort const& orig);
00094 InputPort& operator=(InputPort const& orig);
00095 public:
00096 InputPort(std::string const& name = "unnamed", ConnPolicy const& default_policy = ConnPolicy())
00097 : base::InputPortInterface(name, default_policy)
00098 {}
00099
00100 virtual ~InputPort() { disconnect(); }
00101
00103 FlowStatus read(base::DataSourceBase::shared_ptr source)
00104 { return read(source, true); }
00105
00106 FlowStatus read(base::DataSourceBase::shared_ptr source, bool copy_old_data)
00107 {
00108 typename internal::AssignableDataSource<T>::shared_ptr ds =
00109 boost::dynamic_pointer_cast< internal::AssignableDataSource<T> >(source);
00110 if (! ds)
00111 {
00112 log(Error) << "trying to read to an incompatible data source" << endlog();
00113 return NoData;
00114 }
00115 return read(ds->set(), copy_old_data);
00116 }
00117
00124 FlowStatus readNewest(base::DataSourceBase::shared_ptr source, bool copy_old_data = true)
00125 {
00126 typename internal::AssignableDataSource<T>::shared_ptr ds =
00127 boost::dynamic_pointer_cast< internal::AssignableDataSource<T> >(source);
00128 if (! ds)
00129 {
00130 log(Error) << "trying to read to an incompatible data source" << endlog();
00131 return NoData;
00132 }
00133 return readNewest(ds->set(), copy_old_data);
00134 }
00135
00137 FlowStatus read(typename base::ChannelElement<T>::reference_t sample)
00138 { return read(sample, true); }
00139
00151 FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
00152 {
00153 FlowStatus result = NoData;
00154
00155 #ifdef USE_CPP11
00156 cmanager.select_reader_channel( bind( &InputPort::do_read, this, boost::ref(sample), boost::ref(result), _1, _2), copy_old_data );
00157 #else
00158 cmanager.select_reader_channel( boost::bind( &InputPort::do_read, this, boost::ref(sample), boost::ref(result), boost::lambda::_1, boost::lambda::_2), copy_old_data );
00159 #endif
00160 return result;
00161 }
00162
00163
00170 FlowStatus readNewest(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data = true)
00171 {
00172 FlowStatus result = read(sample, copy_old_data);
00173 if (result != RTT::NewData)
00174 return result;
00175
00176 while (read(sample, false) == RTT::NewData);
00177 return RTT::NewData;
00178 }
00179
00188 void getDataSample(T& sample)
00189 {
00190 typename base::ChannelElement<T>::shared_ptr input = static_cast< base::ChannelElement<T>* >( cmanager.getCurrentChannel() );
00191 if ( input ) {
00192 sample = input->data_sample();
00193 }
00194 }
00195
00197 virtual const types::TypeInfo* getTypeInfo() const
00198 { return internal::DataSourceTypeInfo<T>::getTypeInfo(); }
00199
00203 virtual base::PortInterface* clone() const
00204 { return new InputPort<T>(this->getName()); }
00205
00211 virtual base::PortInterface* antiClone() const
00212 { return new OutputPort<T>(this->getName()); }
00213
00217 base::DataSourceBase* getDataSource()
00218 {
00219 return new internal::InputPortSource<T>(*this);
00220 }
00221
00222 virtual bool createStream(ConnPolicy const& policy)
00223 {
00224 return internal::ConnFactory::createStream(*this, policy);
00225 }
00226
00227 #ifndef ORO_DISABLE_PORT_DATA_SCRIPTING
00228
00232 virtual Service* createPortObject()
00233 {
00234 Service* object = base::InputPortInterface::createPortObject();
00235
00236 typedef FlowStatus (InputPort<T>::*ReadSample)(typename base::ChannelElement<T>::reference_t);
00237 ReadSample read_m = &InputPort<T>::read;
00238 object->addSynchronousOperation("read", read_m, this).doc("Reads a sample from the port.").arg("sample", "");
00239 object->addSynchronousOperation("clear", &InputPortInterface::clear, this).doc("Clears any remaining data in this port. After a clear, a read() will return NoData if no writes happened in between.");
00240 return object;
00241 }
00242 #endif
00243 };
00244 }
00245
00246 #endif
00247