00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "../Port.hpp"
00040 #include "ConnFactory.hpp"
00041 #include "../base/InputPortInterface.hpp"
00042 #include "../DataFlowInterface.hpp"
00043 #include "../types/TypeMarshaller.hpp"
00044
00045 using namespace std;
00046 using namespace RTT;
00047 using namespace RTT::internal;
00048
00049 bool LocalConnID::isSameID(ConnID const& id) const
00050 {
00051 LocalConnID const* real_id = dynamic_cast<LocalConnID const*>(&id);
00052 if (!real_id)
00053 return false;
00054 else return real_id->ptr == this->ptr;
00055 }
00056
00057 ConnID* LocalConnID::clone() const {
00058 return new LocalConnID(this->ptr);
00059 }
00060
00061 bool StreamConnID::isSameID(ConnID const& id) const
00062 {
00063 StreamConnID const* real_id = dynamic_cast<StreamConnID const*>(&id);
00064 if (!real_id)
00065 return false;
00066 else return real_id->name_id == this->name_id;
00067 }
00068
00069 ConnID* StreamConnID::clone() const {
00070 return new StreamConnID(this->name_id);
00071 }
00072
00073 base::ChannelElementBase::shared_ptr RTT::internal::ConnFactory::createRemoteConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, const ConnPolicy& policy)
00074 {
00075
00076
00077
00078 int transport = policy.transport == 0 ? input_port.serverProtocol() : policy.transport;
00079 types::TypeInfo const* type_info = output_port.getTypeInfo();
00080 if (!type_info || input_port.getTypeInfo() != type_info)
00081 {
00082 log(Error) << "Type of port " << output_port.getName() << " is not registered into the type system, cannot marshal it into the right transporter" << endlog();
00083
00084 return base::ChannelElementBase::shared_ptr();
00085 }
00086 else if ( !type_info->getProtocol( transport ) )
00087 {
00088 log(Error) << "Type " << type_info->getTypeName() << " cannot be marshalled into the requested transporter (id:"<< transport<<")." << endlog();
00089
00090 return base::ChannelElementBase::shared_ptr();
00091 }
00092 else
00093 {
00094 assert( input_port.getConnFactory() );
00095 return input_port.
00096 getConnFactory()->buildRemoteChannelOutput(output_port, type_info, input_port, policy);
00097 }
00098 return base::ChannelElementBase::shared_ptr();
00099 }
00100
00101 bool ConnFactory::createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy policy) {
00102
00103 if ( output_port.addConnection( input_port.getPortID(), channel_input, policy ) ) {
00104
00105 if ( input_port.channelReady( channel_input->getOutputEndPoint() ) == false ) {
00106 output_port.disconnect( &input_port );
00107 log(Error) << "The input port "<< input_port.getName()
00108 << " could not successfully read from the connection from output port " << output_port.getName() <<endlog();
00109
00110 return false;
00111 }
00112 log(Debug) << "Connected output port "<< output_port.getName()
00113 << " successfully to " << input_port.getName() <<endlog();
00114 return true;
00115 }
00116
00117 channel_input->disconnect(true);
00118 log(Error) << "The output port "<< output_port.getName()
00119 << " could not successfully use the connection to input port " << input_port.getName() <<endlog();
00120 return false;
00121 }
00122
00123 bool ConnFactory::createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr chan, StreamConnID* conn_id) {
00124 if (policy.transport == 0 ) {
00125 log(Error) << "Need a transport for creating streams." <<endlog();
00126 return false;
00127 }
00128 const types::TypeInfo* type = output_port.getTypeInfo();
00129 if ( type->getProtocol(policy.transport) == 0 ) {
00130 log(Error) << "Could not create transport stream for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog();
00131 log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
00132 return false;
00133 }
00134 types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*> ( type->getProtocol(policy.transport) );
00135 if (ttt) {
00136 int size_hint = ttt->getSampleSize( output_port.getDataSource() );
00137 policy.data_size = size_hint;
00138 } else {
00139 log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog();
00140 }
00141 RTT::base::ChannelElementBase::shared_ptr chan_stream = type->getProtocol(policy.transport)->createStream(&output_port, policy, true);
00142
00143 if ( !chan_stream ) {
00144 log(Error) << "Transport failed to create remote channel for output stream of port "<<output_port.getName() << endlog();
00145 return false;
00146 }
00147 chan->setOutput( chan_stream );
00148
00149 if ( output_port.addConnection( new StreamConnID(policy.name_id), chan, policy) ) {
00150 log(Info) << "Created output stream for output port "<< output_port.getName() <<endlog();
00151 return true;
00152 }
00153
00154 log(Error) << "Failed to create output stream for output port "<< output_port.getName() <<endlog();
00155 return false;
00156 }
00157
00158 bool ConnFactory::createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id) {
00159 if (policy.transport == 0 ) {
00160 log(Error) << "Need a transport for creating streams." <<endlog();
00161 return false;
00162 }
00163 const types::TypeInfo* type = input_port.getTypeInfo();
00164 if ( type->getProtocol(policy.transport) == 0 ) {
00165 log(Error) << "Could not create transport stream for port "<< input_port.getName() << " with transport id " << policy.transport <<endlog();
00166 log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
00167 return false;
00168 }
00169
00170
00171
00172 RTT::base::ChannelElementBase::shared_ptr chan = type->getProtocol(policy.transport)->createStream(&input_port,policy, false);
00173
00174 if ( !chan ) {
00175 log(Error) << "Transport failed to create remote channel for input stream of port "<<input_port.getName() << endlog();
00176 return false;
00177 }
00178
00179
00180
00181 ConnPolicy policy2 = policy;
00182 policy2.pull = false;
00183
00184 policy.name_id = policy2.name_id;
00185 conn_id->name_id = policy2.name_id;
00186
00187 chan->getOutputEndPoint()->setOutput( outhalf );
00188 if ( input_port.channelReady( chan->getOutputEndPoint() ) == true ) {
00189 log(Info) << "Created input stream for input port "<< input_port.getName() <<endlog();
00190 return true;
00191 }
00192
00193 chan = 0;
00194 log(Error) << "Failed to create input stream for input port "<< input_port.getName() <<endlog();
00195 return false;
00196 }
00197
00198 base::ChannelElementBase::shared_ptr ConnFactory::createAndCheckOutOfBandConnection( base::OutputPortInterface& output_port,
00199 base::InputPortInterface& input_port,
00200 ConnPolicy const& policy,
00201 base::ChannelElementBase::shared_ptr output_half,
00202 StreamConnID* conn_id)
00203 {
00204
00205 const types::TypeInfo* type = output_port.getTypeInfo();
00206 if ( type->getProtocol(policy.transport) == 0 ) {
00207 log(Error) << "Could not create out-of-band transport for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog();
00208 log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
00209 return 0;
00210 }
00211
00212
00213 ConnPolicy policy2 = policy;
00214 policy2.pull = false;
00215 conn_id->name_id = policy2.name_id;
00216
00217
00218 types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*>( type->getProtocol(policy.transport) );
00219 if (ttt) {
00220 policy2.data_size = ttt->getSampleSize( output_port.getDataSource() );
00221 } else {
00222 log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog();
00223 }
00224
00225 if ( input_port.isLocal() ) {
00226 RTT::base::ChannelElementBase::shared_ptr ceb_input = type->getProtocol(policy.transport)->createStream(&input_port, policy2, false);
00227 if (ceb_input) {
00228 log(Info) <<"Receiving data for port "<<input_port.getName() << " from out-of-band protocol "<< policy.transport << " with id "<< policy2.name_id<<endlog();
00229 } else {
00230 log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a remote channel for port " << input_port.getName()<<endlog();
00231 return 0;
00232 }
00233 ceb_input->getOutputEndPoint()->setOutput(output_half);
00234 output_half = ceb_input;
00235 }
00236
00237
00238 if ( output_port.isLocal() ) {
00239
00240 RTT::base::ChannelElementBase::shared_ptr ceb_output = type->getProtocol(policy.transport)->createStream(&output_port, policy2, true);
00241 if (ceb_output) {
00242 log(Info) <<"Redirecting data for port "<< output_port.getName() << " to out-of-band protocol "<< policy.transport << " with id "<< policy2.name_id <<endlog();
00243 } else {
00244 log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a remote channel for port " << output_port.getName()<<endlog();
00245 return 0;
00246 }
00247
00248
00249 ceb_output->getOutputEndPoint()->setOutput(output_half);
00250 output_half = ceb_output;
00251 }
00252
00253 policy.name_id = policy2.name_id;
00254 conn_id->name_id = policy2.name_id;
00255
00256 return output_half;
00257
00258 }
00259