00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef ORO_CONN_FACTORY_HPP
00040 #define ORO_CONN_FACTORY_HPP
00041
00042 #include <string>
00043 #include "Channels.hpp"
00044 #include "ConnInputEndPoint.hpp"
00045 #include "ConnOutputEndPoint.hpp"
00046 #include "../base/PortInterface.hpp"
00047 #include "../base/InputPortInterface.hpp"
00048 #include "../base/OutputPortInterface.hpp"
00049 #include "../DataFlowInterface.hpp"
00050
00051 #include "../base/DataObject.hpp"
00052 #include "../base/DataObjectUnSync.hpp"
00053 #include "../base/Buffer.hpp"
00054 #include "../base/BufferUnSync.hpp"
00055
00056 namespace RTT
00057 { namespace internal {
00058
00062 struct LocalConnID : public ConnID
00063 {
00064 base::PortInterface const* ptr;
00065 LocalConnID(base::PortInterface const* obj)
00066 : ptr(obj) {}
00067 virtual ConnID* clone() const;
00068 virtual bool isSameID(ConnID const& id) const;
00069 };
00070
00074 struct RTT_API StreamConnID : public ConnID
00075 {
00076 std::string name_id;
00077 StreamConnID(const std::string& name)
00078 : name_id(name) {}
00079 virtual ConnID* clone() const;
00080 virtual bool isSameID(ConnID const& id) const;
00081 };
00082
00083
00090 class RTT_API ConnFactory
00091 {
00092 public:
00093
00099 virtual base::ChannelElementBase::shared_ptr buildRemoteChannelOutput(
00100 base::OutputPortInterface& output_port,
00101 types::TypeInfo const* type_info,
00102 base::InputPortInterface& input, const ConnPolicy& policy) = 0;
00103
00111 template<typename T>
00112 static base::ChannelElementBase* buildDataStorage(ConnPolicy const& policy, const T& initial_value = T())
00113 {
00114 if (policy.type == ConnPolicy::DATA)
00115 {
00116 typename base::DataObjectInterface<T>::shared_ptr data_object;
00117 switch (policy.lock_policy)
00118 {
00119 #ifndef OROBLD_OS_NO_ASM
00120 case ConnPolicy::LOCK_FREE:
00121 data_object.reset( new base::DataObjectLockFree<T>(initial_value) );
00122 break;
00123 #else
00124 case ConnPolicy::LOCK_FREE:
00125 RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
00126 #endif
00127 case ConnPolicy::LOCKED:
00128 data_object.reset( new base::DataObjectLocked<T>(initial_value) );
00129 break;
00130 case ConnPolicy::UNSYNC:
00131 data_object.reset( new base::DataObjectUnSync<T>(initial_value) );
00132 break;
00133 }
00134
00135 ChannelDataElement<T>* result = new ChannelDataElement<T>(data_object);
00136 return result;
00137 }
00138 else if (policy.type == ConnPolicy::BUFFER)
00139 {
00140 base::BufferInterface<T>* buffer_object = 0;
00141 switch (policy.lock_policy)
00142 {
00143 #ifndef OROBLD_OS_NO_ASM
00144 case ConnPolicy::LOCK_FREE:
00145 buffer_object = new base::BufferLockFree<T>(policy.size, initial_value);
00146 break;
00147 #else
00148 case ConnPolicy::LOCK_FREE:
00149 RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
00150 #endif
00151 case ConnPolicy::LOCKED:
00152 buffer_object = new base::BufferLocked<T>(policy.size, initial_value);
00153 break;
00154 case ConnPolicy::UNSYNC:
00155 buffer_object = new base::BufferUnSync<T>(policy.size, initial_value);
00156 break;
00157 }
00158 return new ChannelBufferElement<T>(typename base::BufferInterface<T>::shared_ptr(buffer_object));
00159 }
00160 return NULL;
00161 }
00162
00171 template<typename T>
00172 static base::ChannelElementBase::shared_ptr buildChannelInput(OutputPort<T>& port, ConnID* conn_id, base::ChannelElementBase::shared_ptr output_channel)
00173 {
00174 assert(conn_id);
00175 base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id);
00176 if (output_channel)
00177 endpoint->setOutput(output_channel);
00178 return endpoint;
00179 }
00180
00191 template<typename T>
00192 static base::ChannelElementBase::shared_ptr buildBufferedChannelInput(OutputPort<T>& port, ConnID* conn_id, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr output_channel)
00193 {
00194 assert(conn_id);
00195 base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id);
00196 base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, port.getLastWrittenValue() );
00197 endpoint->setOutput(data_object);
00198 if (output_channel)
00199 data_object->setOutput(output_channel);
00200 return endpoint;
00201 }
00202
00210 template<typename T>
00211 static base::ChannelElementBase::shared_ptr buildChannelOutput(InputPort<T>& port, ConnID* conn_id)
00212 {
00213 assert(conn_id);
00214 base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id);
00215 return endpoint;
00216 }
00217
00227 template<typename T>
00228 static base::ChannelElementBase::shared_ptr buildBufferedChannelOutput(InputPort<T>& port, ConnID* conn_id, ConnPolicy const& policy, T const& initial_value = T() )
00229 {
00230 assert(conn_id);
00231 base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id);
00232 base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, initial_value);
00233 data_object->setOutput(endpoint);
00234 return data_object;
00235 }
00236
00246 template<typename T>
00247 static bool createConnection(OutputPort<T>& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy)
00248 {
00249 if ( !output_port.isLocal() ) {
00250 log(Error) << "Need a local OutputPort to create connections." <<endlog();
00251 return false;
00252 }
00253
00254 InputPort<T>* input_p = dynamic_cast<InputPort<T>*>(&input_port);
00255
00256
00257 base::ChannelElementBase::shared_ptr output_half = 0;
00258 if (input_port.isLocal() && policy.transport == 0)
00259 {
00260
00261 if (!input_p)
00262 {
00263 log(Error) << "Port " << input_port.getName() << " is not compatible with " << output_port.getName() << endlog();
00264 return false;
00265 }
00266
00267 output_half = buildBufferedChannelOutput<T>(*input_p, output_port.getPortID(), policy, output_port.getLastWrittenValue());
00268 }
00269 else
00270 {
00271
00272
00273
00274
00275 if ( !input_port.isLocal() ) {
00276 output_half = createRemoteConnection( output_port, input_port, policy);
00277 } else
00278 output_half = createOutOfBandConnection<T>( output_port, *input_p, policy);
00279 }
00280
00281 if (!output_half)
00282 return false;
00283
00284
00285
00286 base::ChannelElementBase::shared_ptr channel_input =
00287 buildChannelInput<T>(output_port, input_port.getPortID(), output_half);
00288
00289 return createAndCheckConnection(output_port, input_port, channel_input, policy );
00290 }
00291
00299 template<class T>
00300 static bool createStream(OutputPort<T>& output_port, ConnPolicy const& policy)
00301 {
00302 StreamConnID *sid = new StreamConnID(policy.name_id);
00303 RTT::base::ChannelElementBase::shared_ptr chan = buildChannelInput( output_port, sid, base::ChannelElementBase::shared_ptr() );
00304 return createAndCheckStream(output_port, policy, chan, sid);
00305 }
00306
00308 static bool createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr chan, StreamConnID* conn_id);
00309
00317 template<class T>
00318 static bool createStream(InputPort<T>& input_port, ConnPolicy const& policy)
00319 {
00320 StreamConnID *sid = new StreamConnID(policy.name_id);
00321 RTT::base::ChannelElementBase::shared_ptr outhalf = buildChannelOutput( input_port, sid );
00322 if ( createAndCheckStream(input_port, policy, outhalf, sid) )
00323 return true;
00324 input_port.removeConnection(sid);
00325 return false;
00326 }
00327
00328 protected:
00329 static bool createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy policy);
00330
00331 static bool createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id);
00332
00333 static base::ChannelElementBase::shared_ptr createRemoteConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy);
00334
00342 template<class T>
00343 static base::ChannelElementBase::shared_ptr createOutOfBandConnection(OutputPort<T>& output_port, InputPort<T>& input_port, ConnPolicy const& policy) {
00344 StreamConnID* conn_id = new StreamConnID(policy.name_id);
00345 RTT::base::ChannelElementBase::shared_ptr output_half = ConnFactory::buildChannelOutput<T>(input_port, conn_id);
00346 return createAndCheckOutOfBandConnection( output_port, input_port, policy, output_half, conn_id);
00347 }
00348
00349 static base::ChannelElementBase::shared_ptr createAndCheckOutOfBandConnection( base::OutputPortInterface& output_port,
00350 base::InputPortInterface& input_port,
00351 ConnPolicy const& policy,
00352 base::ChannelElementBase::shared_ptr output_half,
00353 StreamConnID* conn_id);
00354 };
00355
00356 }}
00357
00358 #endif
00359