$search
00001 /*************************************************************************** 00002 tag: Peter Soetens Thu Oct 22 11:59:08 CEST 2009 OutputPort.hpp 00003 00004 OutputPort.hpp - description 00005 ------------------- 00006 begin : Thu October 22 2009 00007 copyright : (C) 2009 Sylvain Joyeux 00008 email : sylvain.joyeux@m4x.org 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #ifndef ORO_OUTPUT_PORT_HPP 00040 #define ORO_OUTPUT_PORT_HPP 00041 00042 #include "base/OutputPortInterface.hpp" 00043 #include "base/DataObject.hpp" 00044 #include "internal/DataObjectDataSource.hpp" 00045 #include "internal/Channels.hpp" 00046 #include "internal/ConnFactory.hpp" 00047 #include "Service.hpp" 00048 #include "OperationCaller.hpp" 00049 00050 #include "InputPort.hpp" 00051 00052 namespace RTT 00053 { 00069 template<typename T> 00070 class OutputPort : public base::OutputPortInterface 00071 { 00072 friend class internal::ConnInputEndpoint<T>; 00073 00074 bool do_write(typename base::ChannelElement<T>::param_t sample, const internal::ConnectionManager::ChannelDescriptor& descriptor) 00075 { 00076 typename base::ChannelElement<T>::shared_ptr output 00077 = boost::static_pointer_cast< base::ChannelElement<T> >(descriptor.get<1>()); 00078 if (output->write(sample)) 00079 return false; 00080 else 00081 { 00082 log(Error) << "A channel of port " << getName() << " has been invalidated during write(), it will be removed" << endlog(); 00083 return true; 00084 } 00085 } 00086 00087 bool do_init(typename base::ChannelElement<T>::param_t sample, const internal::ConnectionManager::ChannelDescriptor& descriptor) 00088 { 00089 typename base::ChannelElement<T>::shared_ptr output 00090 = boost::static_pointer_cast< base::ChannelElement<T> >(descriptor.get<1>()); 00091 if (output->data_sample(sample)) 00092 return false; 00093 else 00094 { 00095 log(Error) << "A channel of port " << getName() << " has been invalidated during setDataSample(), it will be removed" << endlog(); 00096 return true; 00097 } 00098 } 00099 00100 virtual bool connectionAdded( base::ChannelElementBase::shared_ptr channel_input, ConnPolicy const& policy ) { 00101 // Initialize the new channel with last written data if requested 00102 // (and available) 00103 00104 // This this the input channel element of the whole connection 00105 typename base::ChannelElement<T>::shared_ptr channel_el_input = 00106 static_cast< base::ChannelElement<T>* >(channel_input.get()); 00107 00108 if (has_initial_sample) 00109 { 00110 T const& initial_sample = sample->Get(); 00111 if ( channel_el_input->data_sample(initial_sample) ) { 00112 if ( has_last_written_value && policy.init ) 00113 return channel_el_input->write(initial_sample); 00114 return true; 00115 } else { 00116 Logger::In in("OutputPort"); 00117 log(Error) << "Failed to pass data sample to data channel. Aborting connection."<<endlog(); 00118 return false; 00119 } 00120 } 00121 // even if we're not written, test the connection with a default sample. 00122 return channel_el_input->data_sample( T() ); 00123 } 00124 00126 bool has_last_written_value; 00128 // data_sample or by calling write() with keeps_next_written_value or 00129 // keeps_last_written_value to true 00130 bool has_initial_sample; 00132 // This is used to initialize connections with a known sample 00133 bool keeps_next_written_value; 00135 // This is used to allow the use of the 'init' connection policy option 00136 bool keeps_last_written_value; 00137 typename base::DataObjectInterface<T>::shared_ptr sample; 00138 00145 OutputPort( OutputPort const& orig ); 00146 OutputPort& operator=(OutputPort const& orig); 00147 00148 public: 00165 OutputPort(std::string const& name = "unnamed", bool keep_last_written_value = true) 00166 : base::OutputPortInterface(name) 00167 , has_last_written_value(false) 00168 , has_initial_sample(false) 00169 , keeps_next_written_value(false) 00170 , keeps_last_written_value(false) 00171 , sample( new base::DataObject<T>() ) 00172 { 00173 if (keep_last_written_value) 00174 keepLastWrittenValue(true); 00175 } 00176 00177 void keepNextWrittenValue(bool keep) 00178 { 00179 keeps_next_written_value = keep; 00180 } 00181 00182 void keepLastWrittenValue(bool keep) 00183 { 00184 keeps_last_written_value = keep; 00185 } 00186 00187 bool keepsLastWrittenValue() const { return keeps_last_written_value; } 00188 00194 T getLastWrittenValue() const 00195 { 00196 return sample->Get(); 00197 } 00198 00205 bool getLastWrittenValue(T& sample) const 00206 { 00207 if (has_last_written_value) 00208 { 00209 this->sample->Get(sample); 00210 return true; 00211 } 00212 return false; 00213 } 00214 00215 virtual base::DataSourceBase::shared_ptr getDataSource() const 00216 { 00217 // we create this on the fly. 00218 return new internal::DataObjectDataSource<T>( sample ); 00219 } 00220 00230 void setDataSample(const T& sample) 00231 { 00232 this->sample->Set(sample); 00233 has_initial_sample = true; 00234 has_last_written_value = false; 00235 00236 cmanager.delete_if( boost::bind( 00237 &OutputPort<T>::do_init, this, boost::ref(sample), _1) 00238 ); 00239 } 00240 00245 void write(const T& sample) 00246 { 00247 if (keeps_last_written_value || keeps_next_written_value) 00248 { 00249 keeps_next_written_value = false; 00250 has_initial_sample = true; 00251 this->sample->Set(sample); 00252 } 00253 has_last_written_value = keeps_last_written_value; 00254 00255 cmanager.delete_if( boost::bind( 00256 &OutputPort<T>::do_write, this, boost::ref(sample), boost::lambda::_1) 00257 ); 00258 } 00259 00260 void write(base::DataSourceBase::shared_ptr source) 00261 { 00262 typename internal::AssignableDataSource<T>::shared_ptr ds = 00263 boost::dynamic_pointer_cast< internal::AssignableDataSource<T> >(source); 00264 if (ds) 00265 write(ds->rvalue()); 00266 else 00267 { 00268 typename internal::DataSource<T>::shared_ptr ds = 00269 boost::dynamic_pointer_cast< internal::DataSource<T> >(source); 00270 if (ds) 00271 write(ds->get()); 00272 else 00273 log(Error) << "trying to write from an incompatible data source" << endlog(); 00274 } 00275 } 00276 00278 virtual const types::TypeInfo* getTypeInfo() const 00279 { return internal::DataSourceTypeInfo<T>::getTypeInfo(); } 00280 00284 virtual base::PortInterface* clone() const 00285 { return new OutputPort<T>(this->getName()); } 00286 00292 virtual base::PortInterface* antiClone() const 00293 { return new InputPort<T>(this->getName()); } 00294 00295 using base::OutputPortInterface::createConnection; 00296 00299 virtual bool createConnection(base::InputPortInterface& input_port, ConnPolicy const& policy) 00300 { 00301 return internal::ConnFactory::createConnection(*this, input_port, policy); 00302 } 00303 00304 virtual bool createStream(ConnPolicy const& policy) 00305 { 00306 return internal::ConnFactory::createStream(*this, policy); 00307 } 00308 00309 #ifndef ORO_DISABLE_PORT_DATA_SCRIPTING 00310 00314 virtual Service* createPortObject() 00315 { 00316 Service* object = base::OutputPortInterface::createPortObject(); 00317 // Force resolution on the overloaded write method 00318 typedef void (OutputPort<T>::*WriteSample)(T const&); 00319 WriteSample write_m = &OutputPort::write; 00320 typedef T (OutputPort<T>::*LastSample)() const; 00321 LastSample last_m = &OutputPort::getLastWrittenValue; 00322 object->addSynchronousOperation("write", write_m, this).doc("Writes a sample on the port.").arg("sample", ""); 00323 object->addSynchronousOperation("last", last_m, this).doc("Returns last written value to this port."); 00324 return object; 00325 } 00326 #endif 00327 }; 00328 00329 } 00330 00331 #endif 00332