Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef ORO_INPUT_PORT_HPP
00040 #define ORO_INPUT_PORT_HPP
00041
00042 #include "base/InputPortInterface.hpp"
00043 #include "internal/Channels.hpp"
00044 #include "internal/InputPortSource.hpp"
00045 #include "Service.hpp"
00046 #include "OperationCaller.hpp"
00047
00048 #include "OutputPort.hpp"
00049
00050 namespace RTT
00051 {
00062 template<typename T>
00063 class InputPort : public base::InputPortInterface
00064 {
00065 friend class internal::ConnOutputEndpoint<T>;
00066
00067 virtual bool connectionAdded( base::ChannelElementBase::shared_ptr channel_input, ConnPolicy const& policy ) { return true; }
00068
00069 bool do_read(typename base::ChannelElement<T>::reference_t sample, FlowStatus& result, bool copy_old_data, const internal::ConnectionManager::ChannelDescriptor& descriptor)
00070 {
00071 typename base::ChannelElement<T>::shared_ptr input = static_cast< base::ChannelElement<T>* >( descriptor.get<1>().get() );
00072 assert( result != NewData );
00073 if ( input ) {
00074 FlowStatus tresult = input->read(sample, copy_old_data);
00075
00076 if (tresult == NewData) {
00077 result = tresult;
00078 return true;
00079 }
00080
00081 if (tresult > result)
00082 result = tresult;
00083 }
00084 return false;
00085 }
00086
00093 InputPort(InputPort const& orig);
00094 InputPort& operator=(InputPort const& orig);
00095 public:
00096 InputPort(std::string const& name = "unnamed", ConnPolicy const& default_policy = ConnPolicy())
00097 : base::InputPortInterface(name, default_policy)
00098 {}
00099
00100 virtual ~InputPort() { disconnect(); }
00101
00103 FlowStatus read(base::DataSourceBase::shared_ptr source)
00104 { return read(source, true); }
00105
00106 FlowStatus read(base::DataSourceBase::shared_ptr source, bool copy_old_data)
00107 {
00108 typename internal::AssignableDataSource<T>::shared_ptr ds =
00109 boost::dynamic_pointer_cast< internal::AssignableDataSource<T> >(source);
00110 if (! ds)
00111 {
00112 log(Error) << "trying to read to an incompatible data source" << endlog();
00113 return NoData;
00114 }
00115 return read(ds->set(), copy_old_data);
00116 }
00117
00124 FlowStatus readNewest(base::DataSourceBase::shared_ptr source, bool copy_old_data = true)
00125 {
00126 typename internal::AssignableDataSource<T>::shared_ptr ds =
00127 boost::dynamic_pointer_cast< internal::AssignableDataSource<T> >(source);
00128 if (! ds)
00129 {
00130 log(Error) << "trying to read to an incompatible data source" << endlog();
00131 return NoData;
00132 }
00133 return readNewest(ds->set(), copy_old_data);
00134 }
00135
00137 FlowStatus read(typename base::ChannelElement<T>::reference_t sample)
00138 { return read(sample, true); }
00139
00151 FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
00152 {
00153 FlowStatus result = NoData;
00154
00155 cmanager.select_reader_channel( boost::bind( &InputPort::do_read, this, boost::ref(sample), boost::ref(result), boost::lambda::_1, boost::lambda::_2), copy_old_data );
00156 return result;
00157 }
00158
00159
00166 FlowStatus readNewest(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data = true)
00167 {
00168 FlowStatus result = read(sample, copy_old_data);
00169 if (result != RTT::NewData)
00170 return result;
00171
00172 while (read(sample, false) == RTT::NewData);
00173 return RTT::NewData;
00174 }
00175
00184 void getDataSample(T& sample)
00185 {
00186 typename base::ChannelElement<T>::shared_ptr input = static_cast< base::ChannelElement<T>* >( cmanager.getCurrentChannel() );
00187 if ( input ) {
00188 sample = input->data_sample();
00189 }
00190 }
00191
00193 virtual const types::TypeInfo* getTypeInfo() const
00194 { return internal::DataSourceTypeInfo<T>::getTypeInfo(); }
00195
00199 virtual base::PortInterface* clone() const
00200 { return new InputPort<T>(this->getName()); }
00201
00207 virtual base::PortInterface* antiClone() const
00208 { return new OutputPort<T>(this->getName()); }
00209
00213 base::DataSourceBase* getDataSource()
00214 {
00215 return new internal::InputPortSource<T>(*this);
00216 }
00217
00218 virtual bool createStream(ConnPolicy const& policy)
00219 {
00220 return internal::ConnFactory::createStream(*this, policy);
00221 }
00222
00223 #ifndef ORO_DISABLE_PORT_DATA_SCRIPTING
00224
00228 virtual Service* createPortObject()
00229 {
00230 Service* object = base::InputPortInterface::createPortObject();
00231
00232 typedef FlowStatus (InputPort<T>::*ReadSample)(typename base::ChannelElement<T>::reference_t);
00233 ReadSample read_m = &InputPort<T>::read;
00234 object->addSynchronousOperation("read", read_m, this).doc("Reads a sample from the port.").arg("sample", "");
00235 return object;
00236 }
00237 #endif
00238 };
00239 }
00240
00241 #endif
00242