00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef ORO_OUTPUT_PORT_HPP
00040 #define ORO_OUTPUT_PORT_HPP
00041
00042 #include "base/OutputPortInterface.hpp"
00043 #include "base/DataObject.hpp"
00044 #include "internal/DataObjectDataSource.hpp"
00045 #include "internal/Channels.hpp"
00046 #include "internal/ConnFactory.hpp"
00047 #include "Service.hpp"
00048 #include "OperationCaller.hpp"
00049
00050 #include "InputPort.hpp"
00051
00052 namespace RTT
00053 {
00069 template<typename T>
00070 class OutputPort : public base::OutputPortInterface
00071 {
00072 friend class internal::ConnInputEndpoint<T>;
00073
00074 bool do_write(typename base::ChannelElement<T>::param_t sample, const internal::ConnectionManager::ChannelDescriptor& descriptor)
00075 {
00076 typename base::ChannelElement<T>::shared_ptr output
00077 = boost::static_pointer_cast< base::ChannelElement<T> >(descriptor.get<1>());
00078 if (output->write(sample))
00079 return false;
00080 else
00081 {
00082 log(Error) << "A channel of port " << getName() << " has been invalidated during write(), it will be removed" << endlog();
00083 return true;
00084 }
00085 }
00086
00087 bool do_init(typename base::ChannelElement<T>::param_t sample, const internal::ConnectionManager::ChannelDescriptor& descriptor)
00088 {
00089 typename base::ChannelElement<T>::shared_ptr output
00090 = boost::static_pointer_cast< base::ChannelElement<T> >(descriptor.get<1>());
00091 if (output->data_sample(sample))
00092 return false;
00093 else
00094 {
00095 log(Error) << "A channel of port " << getName() << " has been invalidated during setDataSample(), it will be removed" << endlog();
00096 return true;
00097 }
00098 }
00099
00100 virtual bool connectionAdded( base::ChannelElementBase::shared_ptr channel_input, ConnPolicy const& policy ) {
00101
00102
00103
00104
00105 typename base::ChannelElement<T>::shared_ptr channel_el_input =
00106 static_cast< base::ChannelElement<T>* >(channel_input.get());
00107
00108 if (has_initial_sample)
00109 {
00110 T const& initial_sample = sample->Get();
00111 if ( channel_el_input->data_sample(initial_sample) ) {
00112 if ( has_last_written_value && policy.init )
00113 return channel_el_input->write(initial_sample);
00114 return true;
00115 } else {
00116 Logger::In in("OutputPort");
00117 log(Error) << "Failed to pass data sample to data channel. Aborting connection."<<endlog();
00118 return false;
00119 }
00120 }
00121
00122 return channel_el_input->data_sample( T() );
00123 }
00124
00126 bool has_last_written_value;
00128
00129
00130 bool has_initial_sample;
00132
00133 bool keeps_next_written_value;
00135
00136 bool keeps_last_written_value;
00137 typename base::DataObjectInterface<T>::shared_ptr sample;
00138
00145 OutputPort( OutputPort const& orig );
00146 OutputPort& operator=(OutputPort const& orig);
00147
00148 public:
00165 OutputPort(std::string const& name = "unnamed", bool keep_last_written_value = true)
00166 : base::OutputPortInterface(name)
00167 , has_last_written_value(false)
00168 , has_initial_sample(false)
00169 , keeps_next_written_value(false)
00170 , keeps_last_written_value(false)
00171 , sample( new base::DataObject<T>() )
00172 {
00173 if (keep_last_written_value)
00174 keepLastWrittenValue(true);
00175 }
00176
00177 void keepNextWrittenValue(bool keep)
00178 {
00179 keeps_next_written_value = keep;
00180 }
00181
00182 void keepLastWrittenValue(bool keep)
00183 {
00184 keeps_last_written_value = keep;
00185 }
00186
00187 bool keepsLastWrittenValue() const { return keeps_last_written_value; }
00188
00194 T getLastWrittenValue() const
00195 {
00196 return sample->Get();
00197 }
00198
00205 bool getLastWrittenValue(T& sample) const
00206 {
00207 if (has_last_written_value)
00208 {
00209 this->sample->Get(sample);
00210 return true;
00211 }
00212 return false;
00213 }
00214
00215 virtual base::DataSourceBase::shared_ptr getDataSource() const
00216 {
00217
00218 return new internal::DataObjectDataSource<T>( sample );
00219 }
00220
00230 void setDataSample(const T& sample)
00231 {
00232 this->sample->Set(sample);
00233 has_initial_sample = true;
00234 has_last_written_value = false;
00235
00236 cmanager.delete_if( boost::bind(
00237 &OutputPort<T>::do_init, this, boost::ref(sample), _1)
00238 );
00239 }
00240
00245 void write(const T& sample)
00246 {
00247 if (keeps_last_written_value || keeps_next_written_value)
00248 {
00249 keeps_next_written_value = false;
00250 has_initial_sample = true;
00251 this->sample->Set(sample);
00252 }
00253 has_last_written_value = keeps_last_written_value;
00254
00255 cmanager.delete_if( boost::bind(
00256 &OutputPort<T>::do_write, this, boost::ref(sample), boost::lambda::_1)
00257 );
00258 }
00259
00260 void write(base::DataSourceBase::shared_ptr source)
00261 {
00262 typename internal::AssignableDataSource<T>::shared_ptr ds =
00263 boost::dynamic_pointer_cast< internal::AssignableDataSource<T> >(source);
00264 if (ds)
00265 write(ds->rvalue());
00266 else
00267 {
00268 typename internal::DataSource<T>::shared_ptr ds =
00269 boost::dynamic_pointer_cast< internal::DataSource<T> >(source);
00270 if (ds)
00271 write(ds->get());
00272 else
00273 log(Error) << "trying to write from an incompatible data source" << endlog();
00274 }
00275 }
00276
00278 virtual const types::TypeInfo* getTypeInfo() const
00279 { return internal::DataSourceTypeInfo<T>::getTypeInfo(); }
00280
00284 virtual base::PortInterface* clone() const
00285 { return new OutputPort<T>(this->getName()); }
00286
00292 virtual base::PortInterface* antiClone() const
00293 { return new InputPort<T>(this->getName()); }
00294
00295 using base::OutputPortInterface::createConnection;
00296
00299 virtual bool createConnection(base::InputPortInterface& input_port, ConnPolicy const& policy)
00300 {
00301 return internal::ConnFactory::createConnection(*this, input_port, policy);
00302 }
00303
00304 virtual bool createStream(ConnPolicy const& policy)
00305 {
00306 return internal::ConnFactory::createStream(*this, policy);
00307 }
00308
00313 virtual Service* createPortObject()
00314 {
00315 Service* object = base::OutputPortInterface::createPortObject();
00316
00317 typedef void (OutputPort<T>::*WriteSample)(T const&);
00318 WriteSample write_m = &OutputPort::write;
00319 typedef T (OutputPort<T>::*LastSample)() const;
00320 LastSample last_m = &OutputPort::getLastWrittenValue;
00321 object->addSynchronousOperation("write", write_m, this).doc("Writes a sample on the port.").arg("sample", "");
00322 object->addSynchronousOperation("last", last_m, this).doc("Returns last written value to this port.");
00323 return object;
00324 }
00325 };
00326
00327 }
00328
00329 #endif
00330