39 #include "../Port.hpp" 41 #include "../base/InputPortInterface.hpp" 42 #include "../DataFlowInterface.hpp" 43 #include "../types/TypeMarshaller.hpp" 49 bool LocalConnID::isSameID(
ConnID const&
id)
const 54 else return real_id->
ptr == this->ptr;
61 bool StreamConnID::isSameID(
ConnID const&
id)
const 66 else return real_id->
name_id == this->name_id;
69 ConnID* StreamConnID::clone()
const {
80 if (!type_info || input_port.
getTypeInfo() != type_info)
82 log(
Error) <<
"Type of port " << output_port.
getName() <<
" is not registered into the type system, cannot marshal it into the right transporter" <<
endlog();
88 log(
Error) <<
"Type " << type_info->
getTypeName() <<
" cannot be marshalled into the requested transporter (id:"<< transport<<
")." <<
endlog();
95 buildRemoteChannelOutput(output_port, type_info, input_port, policy);
102 if (!channel_input->connectTo(channel_output, policy.
mandatory)) {
103 channel_input->disconnect(channel_output,
true);
104 channel_output->disconnect(channel_input,
false);
112 next_hop = channel_input;
113 while(next_hop->getInput() && next_hop->getInput() != output_port.
getEndpoint()) {
114 next_hop = next_hop->getInput();
120 <<
" could not successfully use the connection to input port " << input_port.
getName() <<
endlog();
121 channel_input->disconnect(channel_output,
true);
126 if ( !channel_output->channelReady( channel_input, policy, output_port.
getPortID() ) ) {
128 <<
" could not successfully read from the connection from output port " << output_port.
getName() <<
endlog();
130 channel_output->disconnect(channel_input,
false);
141 log(
Error) <<
"Need a transport for creating streams." <<
endlog();
147 log(
Error) <<
"No such transport registered. Check your policy.transport settings or add the transport for type "<< type->
getTypeName() <<
endlog();
159 if ( !chan_stream ) {
160 log(
Error) <<
"Transport failed to create remote channel for output stream of port "<<output_port.
getName() <<
endlog();
165 channel_input->connectTo( chan_stream, policy.
mandatory );
167 if ( !output_port.
addConnection( conn_id, chan_stream, policy ) ) {
169 channel_input->disconnect( chan_stream,
true );
180 log(
Error) <<
"Need a transport for creating streams." <<
endlog();
186 log(
Error) <<
"No such transport registered. Check your policy.transport settings or add the transport for type "<< type->
getTypeName() <<
endlog();
195 log(
Error) <<
"Transport failed to create remote channel for input stream of port " << input_port.
getName() <<
endlog();
199 chan = chan->getOutputEndPoint();
202 chan->connectTo( outhalf, policy.
mandatory );
203 if ( !outhalf->channelReady(chan, policy, conn_id) ) {
205 chan->disconnect(
true);
218 return createAndCheckSharedConnection(output_port, input_port, shared_connection, policy);
223 if (!shared_connection)
return false;
228 (shared_connection->getConnPolicy()->type != policy.
type) ||
229 (shared_connection->getConnPolicy()->size != policy.
size) ||
230 (shared_connection->getConnPolicy()->lock_policy != policy.
lock_policy)
233 log(
Error) <<
"You mixed incompatible connection policies for shared connection '" << shared_connection->getName() <<
"': " 234 <<
"The new connection requests a " << policy <<
" connection, " 235 <<
"but the existing connection is of type " << *(shared_connection->getConnPolicy()) <<
"." <<
endlog();
240 policy.
name_id = shared_connection->getName();
244 if ( !output_port->
addConnection( shared_connection->getConnID(), shared_connection, policy ) ) {
247 <<
" could not successfully connect to shared connection '" << shared_connection->getName() <<
"'." <<
endlog();
256 if ( !input_port->
addConnection( shared_connection->getConnID(), shared_connection, policy ) ) {
259 <<
" could not successfully connect to shared connection '" << shared_connection->getName() <<
"'." <<
endlog();
271 shared_connection.reset();
278 if (!shared_connection) {
285 if (shared_connection == input_ports_shared_connection) {
288 }
else if (input_ports_shared_connection) {
290 shared_connection.reset();
297 if (!shared_connection) {
299 shared_connection = SharedConnectionRepository::Instance()->get(policy.
name_id);
300 }
else if (shared_connection->getName() != policy.
name_id) {
301 RTT::log(
RTT::Error) <<
"At least one of the given ports is already connected to shared connection '" << shared_connection->getName() <<
"' but you requested to connect to '" << policy.
name_id <<
"'!" <<
RTT::endlog();
302 shared_connection.reset();
307 return bool(shared_connection);
virtual const types::TypeInfo * getTypeInfo() const =0
boost::intrusive_ptr< SharedConnectionBase > shared_ptr
static base::ChannelElementBase::shared_ptr buildRemoteChannelOutput(base::OutputPortInterface &output_port, base::InputPortInterface &input_port, ConnPolicy const &policy)
base::PortInterface const * ptr
virtual bool isLocal() const
const std::string & getTypeName() const
virtual internal::SharedConnectionBase::shared_ptr getSharedConnection() const
const std::string & getName() const
virtual DataSourceBase::shared_ptr getDataSource() const =0
virtual ChannelElementBase * getEndpoint() const =0
virtual bool connectTo(ChannelElementBase::shared_ptr const &output, bool mandatory=true)
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
virtual int serverProtocol() const
boost::intrusive_ptr< ChannelElementBase > shared_ptr
TypeTransporter * getProtocol(int protocol_id) const
virtual unsigned int getSampleSize(base::DataSourceBase::shared_ptr sample, void *cookie=0) const =0
virtual void disconnect()
virtual internal::ConnID * getPortID() const
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
static Logger::LogFunction endlog()