39 #ifndef ORO_CONN_FACTORY_HPP 40 #define ORO_CONN_FACTORY_HPP 48 #include "../base/PortInterface.hpp" 49 #include "../base/InputPortInterface.hpp" 50 #include "../base/OutputPortInterface.hpp" 51 #include "../DataFlowInterface.hpp" 53 #include "../base/DataObject.hpp" 54 #include "../base/DataObjectUnSync.hpp" 55 #include "../base/Buffer.hpp" 56 #include "../base/BufferUnSync.hpp" 57 #include "../Logger.hpp" 59 #include "../rtt-config.h" 165 #ifndef OROBLD_OS_NO_ASM 187 #ifndef OROBLD_OS_NO_ASM 225 bool pull = policy.
pull;
234 log(
Error) <<
"You tried to create a shared output buffer connection for output port " << port.
getName() <<
", " 235 <<
"but the port already has at least one incompatible outgoing connection." <<
endlog();
252 (buffer_policy.type != policy.
type) ||
253 (buffer_policy.size != policy.
size) ||
257 log(
Error) <<
"You mixed incompatible connection policies for the shared output buffer of port " << port.
getName() <<
": " 258 <<
"The new connection requests a " << policy <<
" connection, " 259 <<
"but the port already has a " << buffer_policy <<
" buffer." <<
endlog();
270 log(
Error) <<
"You mixed incompatible connection policies for output port " << port.
getName() <<
": " 271 <<
"The new connection requests a " << policy <<
" connection, " 272 <<
"but the port already has a " << buffer_policy <<
" buffer." <<
endlog();
307 bool pull = policy.
pull;
316 log(
Error) <<
"You tried to create a shared input buffer connection for input port " << port.
getName() <<
", " 317 <<
"but the port already has at least one incompatible incoming connection." <<
endlog();
321 buffer = buildDataStorage<T>(policy, initial_value);
333 (buffer_policy.type != policy.
type) ||
334 (buffer_policy.size != policy.
size) ||
338 log(
Error) <<
"You mixed incompatible connection policies for the shared input buffer of port " << port.
getName() <<
": " 339 <<
"The new connection requests a " << policy <<
" connection, " 340 <<
"but the port already has a " << buffer_policy <<
" buffer." <<
endlog();
351 log(
Error) <<
"You mixed incompatible connection policies for input port " << port.
getName() <<
": " 352 <<
"The new connection requests a " << policy <<
" connection, " 353 <<
"but the port already has a " << buffer_policy <<
" buffer." <<
endlog();
359 buffer = buildDataStorage<T>(policy, initial_value);
391 template <
typename T>
398 if (findSharedConnection(output_port, input_port, policy, shared_connection) && !shared_connection) {
403 if (input_port && !input_port->
isLocal()) {
405 log(
Error) <<
"Cannot create a shared connection for a remote input port or a non-standard transport without knowing the local output port." <<
endlog();
410 if (!shared_connection) {
413 log(
Error) <<
"Could not create a shared remote connection for input port '" << input_port->
getName() <<
"'." <<
endlog();
431 log(
Error) <<
"The remote side refused to connect the input port '" << input_port->
getName() <<
"' to the existing shared connection '" << shared_connection->getName() <<
"'." <<
endlog();
438 if (!shared_connection) {
445 return shared_connection;
463 if ( !output_port.
isLocal() ) {
464 log(
Error) <<
"Need a local OutputPort to create connections." <<
endlog();
469 log(
Info) <<
"OutputPort " << output_port.
getName() <<
" is already connected to " << input_port.
getName() <<
", ignoring new connection." <<
endlog();
477 return createAndCheckSharedConnection(&output_port, &input_port, buildSharedConnection<T>(&output_port, &input_port, policy), policy);
501 output_half = buildRemoteChannelOutput( output_port, input_port, policy);
502 }
else if (input_p) {
503 return createOutOfBandConnection<T>( output_port, *input_p, policy);
516 channel_input = buildChannelInput<T>(output_port, policy);
518 if (!channel_input) {
519 output_half->disconnect(
true);
524 return createAndCheckConnection(output_port, input_port, channel_input, output_half, policy);
542 if (!chan)
return false;
543 if (!
bool(createAndCheckStream(output_port, policy, chan, sid))) {
544 chan->disconnect(
false);
564 if (!outhalf)
return false;
565 if (!
bool(createAndCheckStream(input_port, policy, outhalf, sid))) {
566 outhalf->disconnect(
true);
601 if (!channel_input)
return false;
604 if (!stream_input)
return false;
607 if (!channel_output)
return false;
610 if (!stream_output)
return false;
612 return stream_input->getOutputEndPoint()->connectTo(stream_output->getInputEndPoint(), policy.mandatory);
boost::intrusive_ptr< ChannelElement< T > > shared_ptr
boost::intrusive_ptr< SharedConnectionBase > shared_ptr
base::PortInterface const * ptr
virtual bool isLocal() const
virtual ConnID * clone() const
static base::ChannelElementBase::shared_ptr buildChannelOutput(InputPort< T > &port, ConnPolicy const &policy, T const &initial_value=T())
boost::shared_ptr< DataObjectInterface< T > > shared_ptr
static const int CIRCULAR_BUFFER
virtual base::ChannelElement< T >::shared_ptr getSharedBuffer()
virtual const ConnPolicy * getConnPolicy() const
const std::string & getName() const
virtual bool isSameID(ConnID const &id) const
virtual internal::ConnInputEndpoint< T > * getEndpoint() const
A class which provides unprotected (not thread-safe) access to one typed element of data...
boost::shared_ptr< ConnFactory > ConnFactoryPtr
StreamConnID(const std::string &name)
static bool createStream(InputPort< T > &input_port, ConnPolicy const &policy)
static base::ChannelElement< T > * buildDataStorage(ConnPolicy const &policy, const T &initial_value=T())
virtual bool connectTo(ChannelElementBase::shared_ptr const &output, bool mandatory=true)
virtual bool connectedTo(PortInterface *port)
This DataObject is a Lock-Free implementation, such that reads and writes can happen concurrently wit...
static bool createStream(OutputPort< T > &output_port, ConnPolicy const &policy)
static bool createOutOfBandConnection(OutputPort< T > &output_port, InputPort< T > &input_port, ConnPolicy const &policy)
static const int LOCK_FREE
boost::intrusive_ptr< ChannelElementBase > shared_ptr
A class which provides locked/protected access to one typed element of data.
virtual base::ChannelElement< T >::shared_ptr getSharedBuffer() const
LocalConnID(base::PortInterface const *obj)
static SharedConnectionBase::shared_ptr buildSharedConnection(OutputPort< T > *output_port, base::InputPortInterface *input_port, ConnPolicy const &policy)
boost::intrusive_ptr< ConnOutputEndpoint< T > > shared_ptr
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
static base::ChannelElementBase::shared_ptr buildChannelInput(OutputPort< T > &port, ConnPolicy const &policy, bool force_unbuffered=false)
T getLastWrittenValue() const
static bool createConnection(OutputPort< T > &output_port, base::InputPortInterface &input_port, ConnPolicy const &policy)
static Logger::LogFunction endlog()
boost::shared_ptr< BufferInterface< T > > shared_ptr