$search
00001 /*************************************************************************** 00002 tag: Peter Soetens Thu Oct 22 11:59:08 CEST 2009 ConnFactory.hpp 00003 00004 ConnFactory.hpp - description 00005 ------------------- 00006 begin : Thu October 22 2009 00007 copyright : (C) 2009 Peter Soetens 00008 email : peter@thesourcworks.com 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #ifndef ORO_CONN_FACTORY_HPP 00040 #define ORO_CONN_FACTORY_HPP 00041 00042 #include <string> 00043 #include "Channels.hpp" 00044 #include "ConnInputEndPoint.hpp" 00045 #include "ConnOutputEndPoint.hpp" 00046 #include "../base/PortInterface.hpp" 00047 #include "../base/InputPortInterface.hpp" 00048 #include "../base/OutputPortInterface.hpp" 00049 #include "../DataFlowInterface.hpp" 00050 00051 #include "../base/DataObject.hpp" 00052 #include "../base/DataObjectUnSync.hpp" 00053 #include "../base/Buffer.hpp" 00054 #include "../base/BufferUnSync.hpp" 00055 #include "../Logger.hpp" 00056 00057 namespace RTT 00058 { namespace internal { 00059 00063 struct LocalConnID : public ConnID 00064 { 00065 base::PortInterface const* ptr; 00066 LocalConnID(base::PortInterface const* obj) 00067 : ptr(obj) {} 00068 virtual ConnID* clone() const; 00069 virtual bool isSameID(ConnID const& id) const; 00070 }; 00071 00075 struct RTT_API StreamConnID : public ConnID 00076 { 00077 std::string name_id; 00078 StreamConnID(const std::string& name) 00079 : name_id(name) {} 00080 virtual ConnID* clone() const; 00081 virtual bool isSameID(ConnID const& id) const; 00082 }; 00083 00084 00091 class RTT_API ConnFactory 00092 { 00093 public: 00094 virtual ~ConnFactory() {} 00095 00100 virtual base::InputPortInterface* inputPort(std::string const& name) const = 0; 00101 00106 virtual base::OutputPortInterface* outputPort(std::string const& name) const = 0; 00107 00114 virtual base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const& policy) const = 0; 00115 00122 virtual base::ChannelElementBase::shared_ptr buildChannelOutput(base::InputPortInterface& port) const = 0; 00129 virtual base::ChannelElementBase::shared_ptr buildChannelInput(base::OutputPortInterface& port) const = 0; 00130 00138 template<typename T> 00139 static base::ChannelElementBase* buildDataStorage(ConnPolicy const& policy, const T& initial_value = T()) 00140 { 00141 if (policy.type == ConnPolicy::DATA) 00142 { 00143 typename base::DataObjectInterface<T>::shared_ptr data_object; 00144 switch (policy.lock_policy) 00145 { 00146 #ifndef OROBLD_OS_NO_ASM 00147 case ConnPolicy::LOCK_FREE: 00148 data_object.reset( new base::DataObjectLockFree<T>(initial_value) ); 00149 break; 00150 #else 00151 case ConnPolicy::LOCK_FREE: 00152 RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog(); 00153 #endif 00154 case ConnPolicy::LOCKED: 00155 data_object.reset( new base::DataObjectLocked<T>(initial_value) ); 00156 break; 00157 case ConnPolicy::UNSYNC: 00158 data_object.reset( new base::DataObjectUnSync<T>(initial_value) ); 00159 break; 00160 } 00161 00162 ChannelDataElement<T>* result = new ChannelDataElement<T>(data_object); 00163 return result; 00164 } 00165 else if (policy.type == ConnPolicy::BUFFER || policy.type == ConnPolicy::CIRCULAR_BUFFER) 00166 { 00167 base::BufferInterface<T>* buffer_object = 0; 00168 switch (policy.lock_policy) 00169 { 00170 #ifndef OROBLD_OS_NO_ASM 00171 case ConnPolicy::LOCK_FREE: 00172 buffer_object = new base::BufferLockFree<T>(policy.size, initial_value, policy.type == ConnPolicy::CIRCULAR_BUFFER); 00173 break; 00174 #else 00175 case ConnPolicy::LOCK_FREE: 00176 RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog(); 00177 #endif 00178 case ConnPolicy::LOCKED: 00179 buffer_object = new base::BufferLocked<T>(policy.size, initial_value, policy.type == ConnPolicy::CIRCULAR_BUFFER); 00180 break; 00181 case ConnPolicy::UNSYNC: 00182 buffer_object = new base::BufferUnSync<T>(policy.size, initial_value, policy.type == ConnPolicy::CIRCULAR_BUFFER); 00183 break; 00184 } 00185 return new ChannelBufferElement<T>(typename base::BufferInterface<T>::shared_ptr(buffer_object)); 00186 } 00187 return NULL; 00188 } 00189 00198 template<typename T> 00199 static base::ChannelElementBase::shared_ptr buildChannelInput(OutputPort<T>& port, ConnID* conn_id, base::ChannelElementBase::shared_ptr output_channel) 00200 { 00201 assert(conn_id); 00202 base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id); 00203 if (output_channel) 00204 endpoint->setOutput(output_channel); 00205 return endpoint; 00206 } 00207 00218 template<typename T> 00219 static base::ChannelElementBase::shared_ptr buildBufferedChannelInput(OutputPort<T>& port, ConnID* conn_id, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr output_channel) 00220 { 00221 assert(conn_id); 00222 base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id); 00223 base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, port.getLastWrittenValue() ); 00224 endpoint->setOutput(data_object); 00225 if (output_channel) 00226 data_object->setOutput(output_channel); 00227 return endpoint; 00228 } 00229 00237 template<typename T> 00238 static base::ChannelElementBase::shared_ptr buildChannelOutput(InputPort<T>& port, ConnID* conn_id) 00239 { 00240 assert(conn_id); 00241 base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id); 00242 return endpoint; 00243 } 00244 00254 template<typename T> 00255 static base::ChannelElementBase::shared_ptr buildBufferedChannelOutput(InputPort<T>& port, ConnID* conn_id, ConnPolicy const& policy, T const& initial_value = T() ) 00256 { 00257 assert(conn_id); 00258 base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id); 00259 base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, initial_value); 00260 data_object->setOutput(endpoint); 00261 return data_object; 00262 } 00263 00273 template<typename T> 00274 static bool createConnection(OutputPort<T>& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy) 00275 { 00276 if ( !output_port.isLocal() ) { 00277 log(Error) << "Need a local OutputPort to create connections." <<endlog(); 00278 return false; 00279 } 00280 00281 InputPort<T>* input_p = dynamic_cast<InputPort<T>*>(&input_port); 00282 00283 // This is the input channel element of the output half 00284 base::ChannelElementBase::shared_ptr output_half = 0; 00285 if (input_port.isLocal() && policy.transport == 0) 00286 { 00287 // Local connection 00288 if (!input_p) 00289 { 00290 log(Error) << "Port " << input_port.getName() << " is not compatible with " << output_port.getName() << endlog(); 00291 return false; 00292 } 00293 // local ports, create buffer here. 00294 output_half = buildBufferedChannelOutput<T>(*input_p, output_port.getPortID(), policy, output_port.getLastWrittenValue()); 00295 } 00296 else 00297 { 00298 // if the input is not local, this is a pure remote connection, 00299 // if the input *is* local, the user requested to use a different transport 00300 // than plain memory, rare case, but we accept it. The unit tests use this for example 00301 // to test the OOB transports. 00302 if ( !input_port.isLocal() ) { 00303 output_half = createRemoteConnection( output_port, input_port, policy); 00304 } else 00305 output_half = createOutOfBandConnection<T>( output_port, *input_p, policy); 00306 } 00307 00308 if (!output_half) 00309 return false; 00310 00311 // Since output is local, buildChannelInput is local as well. 00312 // This this the input channel element of the whole connection 00313 base::ChannelElementBase::shared_ptr channel_input = 00314 buildChannelInput<T>(output_port, input_port.getPortID(), output_half); 00315 00316 return createAndCheckConnection(output_port, input_port, channel_input, policy ); 00317 } 00318 00326 template<class T> 00327 static bool createStream(OutputPort<T>& output_port, ConnPolicy const& policy) 00328 { 00329 StreamConnID *sid = new StreamConnID(policy.name_id); 00330 RTT::base::ChannelElementBase::shared_ptr chan = buildChannelInput( output_port, sid, base::ChannelElementBase::shared_ptr() ); 00331 return createAndCheckStream(output_port, policy, chan, sid); 00332 } 00333 00335 static bool createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr chan, StreamConnID* conn_id); 00336 00344 template<class T> 00345 static bool createStream(InputPort<T>& input_port, ConnPolicy const& policy) 00346 { 00347 StreamConnID *sid = new StreamConnID(policy.name_id); 00348 RTT::base::ChannelElementBase::shared_ptr outhalf = buildChannelOutput( input_port, sid ); 00349 if ( createAndCheckStream(input_port, policy, outhalf, sid) ) 00350 return true; 00351 input_port.removeConnection(sid); 00352 return false; 00353 } 00354 00355 protected: 00356 static bool createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy policy); 00357 00358 static bool createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id); 00359 00360 static base::ChannelElementBase::shared_ptr createRemoteConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy); 00361 00369 template<class T> 00370 static base::ChannelElementBase::shared_ptr createOutOfBandConnection(OutputPort<T>& output_port, InputPort<T>& input_port, ConnPolicy const& policy) { 00371 StreamConnID* conn_id = new StreamConnID(policy.name_id); 00372 RTT::base::ChannelElementBase::shared_ptr output_half = ConnFactory::buildChannelOutput<T>(input_port, conn_id); 00373 return createAndCheckOutOfBandConnection( output_port, input_port, policy, output_half, conn_id); 00374 } 00375 00376 static base::ChannelElementBase::shared_ptr createAndCheckOutOfBandConnection( base::OutputPortInterface& output_port, 00377 base::InputPortInterface& input_port, 00378 ConnPolicy const& policy, 00379 base::ChannelElementBase::shared_ptr output_half, 00380 StreamConnID* conn_id); 00381 }; 00382 00383 typedef boost::shared_ptr<ConnFactory> ConnFactoryPtr; 00384 00385 00386 } 00387 } 00388 00389 #endif 00390