00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef ORO_CONN_FACTORY_HPP
00040 #define ORO_CONN_FACTORY_HPP
00041
00042 #include <string>
00043 #include "Channels.hpp"
00044 #include "ConnInputEndPoint.hpp"
00045 #include "ConnOutputEndPoint.hpp"
00046 #include "../base/PortInterface.hpp"
00047 #include "../base/InputPortInterface.hpp"
00048 #include "../base/OutputPortInterface.hpp"
00049 #include "../DataFlowInterface.hpp"
00050
00051 #include "../base/DataObject.hpp"
00052 #include "../base/DataObjectUnSync.hpp"
00053 #include "../base/Buffer.hpp"
00054 #include "../base/BufferUnSync.hpp"
00055 #include "../Logger.hpp"
00056
00057 namespace RTT
00058 { namespace internal {
00059
00063 struct LocalConnID : public ConnID
00064 {
00065 base::PortInterface const* ptr;
00066 LocalConnID(base::PortInterface const* obj)
00067 : ptr(obj) {}
00068 virtual ConnID* clone() const;
00069 virtual bool isSameID(ConnID const& id) const;
00070 };
00071
00075 struct RTT_API StreamConnID : public ConnID
00076 {
00077 std::string name_id;
00078 StreamConnID(const std::string& name)
00079 : name_id(name) {}
00080 virtual ConnID* clone() const;
00081 virtual bool isSameID(ConnID const& id) const;
00082 };
00083
00084
00091 class RTT_API ConnFactory
00092 {
00093 public:
00094 virtual ~ConnFactory() {}
00095
00100 virtual base::InputPortInterface* inputPort(std::string const& name) const = 0;
00101
00106 virtual base::OutputPortInterface* outputPort(std::string const& name) const = 0;
00107
00114 virtual base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const& policy) const = 0;
00115
00122 virtual base::ChannelElementBase::shared_ptr buildChannelOutput(base::InputPortInterface& port) const = 0;
00129 virtual base::ChannelElementBase::shared_ptr buildChannelInput(base::OutputPortInterface& port) const = 0;
00130
00138 template<typename T>
00139 static base::ChannelElementBase* buildDataStorage(ConnPolicy const& policy, const T& initial_value = T())
00140 {
00141 if (policy.type == ConnPolicy::DATA)
00142 {
00143 typename base::DataObjectInterface<T>::shared_ptr data_object;
00144 switch (policy.lock_policy)
00145 {
00146 #ifndef OROBLD_OS_NO_ASM
00147 case ConnPolicy::LOCK_FREE:
00148 data_object.reset( new base::DataObjectLockFree<T>(initial_value) );
00149 break;
00150 #else
00151 case ConnPolicy::LOCK_FREE:
00152 RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
00153 #endif
00154 case ConnPolicy::LOCKED:
00155 data_object.reset( new base::DataObjectLocked<T>(initial_value) );
00156 break;
00157 case ConnPolicy::UNSYNC:
00158 data_object.reset( new base::DataObjectUnSync<T>(initial_value) );
00159 break;
00160 }
00161
00162 ChannelDataElement<T>* result = new ChannelDataElement<T>(data_object);
00163 return result;
00164 }
00165 else if (policy.type == ConnPolicy::BUFFER || policy.type == ConnPolicy::CIRCULAR_BUFFER)
00166 {
00167 base::BufferInterface<T>* buffer_object = 0;
00168 switch (policy.lock_policy)
00169 {
00170 #ifndef OROBLD_OS_NO_ASM
00171 case ConnPolicy::LOCK_FREE:
00172 buffer_object = new base::BufferLockFree<T>(policy.size, initial_value, policy.type == ConnPolicy::CIRCULAR_BUFFER);
00173 break;
00174 #else
00175 case ConnPolicy::LOCK_FREE:
00176 RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
00177 #endif
00178 case ConnPolicy::LOCKED:
00179 buffer_object = new base::BufferLocked<T>(policy.size, initial_value, policy.type == ConnPolicy::CIRCULAR_BUFFER);
00180 break;
00181 case ConnPolicy::UNSYNC:
00182 buffer_object = new base::BufferUnSync<T>(policy.size, initial_value, policy.type == ConnPolicy::CIRCULAR_BUFFER);
00183 break;
00184 }
00185 return new ChannelBufferElement<T>(typename base::BufferInterface<T>::shared_ptr(buffer_object));
00186 }
00187 return NULL;
00188 }
00189
00198 template<typename T>
00199 static base::ChannelElementBase::shared_ptr buildChannelInput(OutputPort<T>& port, ConnID* conn_id, base::ChannelElementBase::shared_ptr output_channel)
00200 {
00201 assert(conn_id);
00202 base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id);
00203 if (output_channel)
00204 endpoint->setOutput(output_channel);
00205 return endpoint;
00206 }
00207
00218 template<typename T>
00219 static base::ChannelElementBase::shared_ptr buildBufferedChannelInput(OutputPort<T>& port, ConnID* conn_id, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr output_channel)
00220 {
00221 assert(conn_id);
00222 base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id);
00223 base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, port.getLastWrittenValue() );
00224 endpoint->setOutput(data_object);
00225 if (output_channel)
00226 data_object->setOutput(output_channel);
00227 return endpoint;
00228 }
00229
00237 template<typename T>
00238 static base::ChannelElementBase::shared_ptr buildChannelOutput(InputPort<T>& port, ConnID* conn_id)
00239 {
00240 assert(conn_id);
00241 base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id);
00242 return endpoint;
00243 }
00244
00254 template<typename T>
00255 static base::ChannelElementBase::shared_ptr buildBufferedChannelOutput(InputPort<T>& port, ConnID* conn_id, ConnPolicy const& policy, T const& initial_value = T() )
00256 {
00257 assert(conn_id);
00258 base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id);
00259 base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, initial_value);
00260 data_object->setOutput(endpoint);
00261 return data_object;
00262 }
00263
00273 template<typename T>
00274 static bool createConnection(OutputPort<T>& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy)
00275 {
00276 if ( !output_port.isLocal() ) {
00277 log(Error) << "Need a local OutputPort to create connections." <<endlog();
00278 return false;
00279 }
00280
00281 InputPort<T>* input_p = dynamic_cast<InputPort<T>*>(&input_port);
00282
00283
00284 base::ChannelElementBase::shared_ptr output_half = 0;
00285 if (input_port.isLocal() && policy.transport == 0)
00286 {
00287
00288 if (!input_p)
00289 {
00290 log(Error) << "Port " << input_port.getName() << " is not compatible with " << output_port.getName() << endlog();
00291 return false;
00292 }
00293
00294 output_half = buildBufferedChannelOutput<T>(*input_p, output_port.getPortID(), policy, output_port.getLastWrittenValue());
00295 }
00296 else
00297 {
00298
00299
00300
00301
00302 if ( !input_port.isLocal() ) {
00303 output_half = createRemoteConnection( output_port, input_port, policy);
00304 } else
00305 output_half = createOutOfBandConnection<T>( output_port, *input_p, policy);
00306 }
00307
00308 if (!output_half)
00309 return false;
00310
00311
00312
00313 base::ChannelElementBase::shared_ptr channel_input =
00314 buildChannelInput<T>(output_port, input_port.getPortID(), output_half);
00315
00316 return createAndCheckConnection(output_port, input_port, channel_input, policy );
00317 }
00318
00326 template<class T>
00327 static bool createStream(OutputPort<T>& output_port, ConnPolicy const& policy)
00328 {
00329 StreamConnID *sid = new StreamConnID(policy.name_id);
00330 RTT::base::ChannelElementBase::shared_ptr chan = buildChannelInput( output_port, sid, base::ChannelElementBase::shared_ptr() );
00331 return createAndCheckStream(output_port, policy, chan, sid);
00332 }
00333
00335 static bool createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr chan, StreamConnID* conn_id);
00336
00344 template<class T>
00345 static bool createStream(InputPort<T>& input_port, ConnPolicy const& policy)
00346 {
00347 StreamConnID *sid = new StreamConnID(policy.name_id);
00348 RTT::base::ChannelElementBase::shared_ptr outhalf = buildChannelOutput( input_port, sid );
00349 if ( createAndCheckStream(input_port, policy, outhalf, sid) )
00350 return true;
00351 input_port.removeConnection(sid);
00352 return false;
00353 }
00354
00355 protected:
00356 static bool createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy policy);
00357
00358 static bool createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id);
00359
00360 static base::ChannelElementBase::shared_ptr createRemoteConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy);
00361
00369 template<class T>
00370 static base::ChannelElementBase::shared_ptr createOutOfBandConnection(OutputPort<T>& output_port, InputPort<T>& input_port, ConnPolicy const& policy) {
00371 StreamConnID* conn_id = new StreamConnID(policy.name_id);
00372 RTT::base::ChannelElementBase::shared_ptr output_half = ConnFactory::buildChannelOutput<T>(input_port, conn_id);
00373 return createAndCheckOutOfBandConnection( output_port, input_port, policy, output_half, conn_id);
00374 }
00375
00376 static base::ChannelElementBase::shared_ptr createAndCheckOutOfBandConnection( base::OutputPortInterface& output_port,
00377 base::InputPortInterface& input_port,
00378 ConnPolicy const& policy,
00379 base::ChannelElementBase::shared_ptr output_half,
00380 StreamConnID* conn_id);
00381 };
00382
00383 typedef boost::shared_ptr<ConnFactory> ConnFactoryPtr;
00384
00385
00386 }
00387 }
00388
00389 #endif
00390