Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "RemotePorts.hpp"
00040 #include "CorbaTypeTransporter.hpp"
00041 #include "DataFlowI.h"
00042 #include "../../DataFlowInterface.hpp"
00043 #include <cassert>
00044 #include "CorbaConnPolicy.hpp"
00045 #include "CorbaLib.hpp"
00046 #include "RemoteConnID.hpp"
00047 #include "../../internal/ConnID.hpp"
00048 #include "../../rtt-detail-fwd.hpp"
00049
00050
00051 using namespace std;
00052 using namespace RTT::detail;
00053
00054 template<typename BaseClass>
00055 RemotePort<BaseClass>::RemotePort(RTT::types::TypeInfo const* type_info,
00056 CDataFlowInterface_ptr dataflow,
00057 std::string const& name,
00058 PortableServer::POA_ptr poa)
00059 : BaseClass(name)
00060 , type_info(type_info)
00061 , dataflow(CDataFlowInterface::_duplicate(dataflow))
00062 , mpoa(PortableServer::POA::_duplicate(poa)) { }
00063
00064 template<typename BaseClass>
00065 CDataFlowInterface_ptr RemotePort<BaseClass>::getDataFlowInterface() const
00066 { return CDataFlowInterface::_duplicate(dataflow); }
00067 template<typename BaseClass>
00068 RTT::types::TypeInfo const* RemotePort<BaseClass>::getTypeInfo() const { return type_info; }
00069 template<typename BaseClass>
00070 int RemotePort<BaseClass>::serverProtocol() const { return ORO_CORBA_PROTOCOL_ID; }
00071 template<typename BaseClass>
00072 bool RemotePort<BaseClass>::connected() const
00073 {
00074 return dataflow->isConnected(this->getName().c_str());
00075 }
00076 template<typename BaseClass>
00077 void RemotePort<BaseClass>::disconnect()
00078 {
00079 dataflow->disconnectPort(this->getName().c_str());
00080 }
00081 template<typename BaseClass>
00082 bool RemotePort<BaseClass>::disconnect(PortInterface* port)
00083 {
00084 Logger::In in("RemotePort::disconnect(PortInterface& port)");
00085 log(Error) << "Disconnecting a single port not yet supported." <<endlog();
00086 return false;
00087 }
00088 template<typename BaseClass>
00089 PortableServer::POA_ptr RemotePort<BaseClass>::_default_POA()
00090 { return PortableServer::POA::_duplicate(mpoa); }
00091
00092 template<typename BaseClass>
00093 RTT::internal::ConnID* RemotePort<BaseClass>::getPortID() const
00094 { return new RemoteConnID(dataflow, this->getName()); }
00095
00096 template<typename BaseClass>
00097 bool RemotePort<BaseClass>::createStream( const RTT::ConnPolicy& policy )
00098 {
00099 log(Error) << "Can't create a data stream on a remote port !" <<endlog();
00100 return false;
00101 }
00102
00103 template<typename BaseClass>
00104 bool RemotePort<BaseClass>::addConnection(RTT::internal::ConnID* port_id, ChannelElementBase::shared_ptr channel_input, RTT::ConnPolicy const& policy)
00105 {
00106 assert(false && "Can/Should not add connection to remote port object !");
00107 return false;
00108 }
00109
00110
00111 RemoteInputPort::RemoteInputPort(RTT::types::TypeInfo const* type_info,
00112 CDataFlowInterface_ptr dataflow, std::string const& reader_port,
00113 PortableServer::POA_ptr poa)
00114 : RemotePort< RTT::base::InputPortInterface >(type_info, dataflow, reader_port, poa)
00115 {}
00116
00117 RTT::base::DataSourceBase* RemoteInputPort::getDataSource()
00118 { throw std::runtime_error("InputPort::getDataSource() is not supported in CORBA port proxies"); }
00119
00120 RTT::base::ChannelElementBase::shared_ptr RemoteInputPort::buildRemoteChannelOutput(
00121 RTT::base::OutputPortInterface& output_port,
00122 RTT::types::TypeInfo const* type,
00123 RTT::base::InputPortInterface& reader_,
00124 RTT::ConnPolicy const& policy)
00125 {
00126
00127 Logger::In in("RemoteInputPort::buildRemoteChannelOutput");
00128
00129
00130
00131 CRemoteChannelElement_var remote;
00132 RTT::base::ChannelElementBase::shared_ptr buf;
00133 try {
00134 CConnPolicy cpolicy = toCORBA(policy);
00135 CChannelElement_var ret = dataflow->buildChannelOutput(getName().c_str(), cpolicy);
00136 if ( CORBA::is_nil(ret) ) {
00137 return 0;
00138 }
00139 remote = CRemoteChannelElement::_narrow( ret.in() );
00140 policy.name_id = toRTT(cpolicy).name_id;
00141 }
00142 catch(CORBA::Exception& e)
00143 {
00144 log(Error) << "Caught CORBA exception while creating a remote channel output:" << endlog();
00145 log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
00146 return NULL;
00147 }
00148
00149
00150
00151 CRemoteChannelElement_i* local =
00152 static_cast<CorbaTypeTransporter*>(type->getProtocol(ORO_CORBA_PROTOCOL_ID))
00153 ->createChannelElement_i(output_port.getInterface(), mpoa, policy.pull);
00154
00155 CRemoteChannelElement_var proxy = local->_this();
00156 local->setRemoteSide(remote);
00157 remote->setRemoteSide(proxy.in());
00158 local->_remove_ref();
00159
00160 RTT::base::ChannelElementBase::shared_ptr corba_ceb = dynamic_cast<RTT::base::ChannelElementBase*>(local);
00161
00162
00163
00164
00165
00166 if ( policy.transport != 0 && policy.transport != ORO_CORBA_PROTOCOL_ID ) {
00167
00168 string name = policy.name_id ;
00169 if ( type->getProtocol(policy.transport) == 0 ) {
00170 log(Error) << "Could not create out-of-band transport for port "<< name << " with transport id " << policy.transport <<endlog();
00171 log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
00172 }
00173 RTT::base::ChannelElementBase::shared_ptr ceb = type->getProtocol(policy.transport)->createStream(this, policy, true);
00174 if (ceb) {
00175
00176 ceb->setOutput( corba_ceb );
00177 corba_ceb = ceb;
00178 log(Info) <<"Redirecting data for port "<<name << " to out-of-band protocol "<< policy.transport << endlog();
00179 } else {
00180 log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a dual channel for port " << name<<endlog();
00181 }
00182 } else {
00183
00184 buf = type->buildDataStorage(policy);
00185 assert(buf);
00186 buf->setOutput( corba_ceb );
00187 corba_ceb = buf;
00188 }
00189
00190
00191
00192 channel_map[ corba_ceb->getOutputEndPoint().get() ] = CChannelElement::_duplicate( remote );
00193
00194 return corba_ceb;
00195 }
00196
00197 RTT::base::PortInterface* RemoteInputPort::clone() const
00198 { return type_info->inputPort(getName()); }
00199
00200 RTT::base::PortInterface* RemoteInputPort::antiClone() const
00201 { return type_info->outputPort(getName()); }
00202
00203
00204 bool RemoteInputPort::channelReady(RTT::base::ChannelElementBase::shared_ptr channel, RTT::ConnPolicy const& policy) {
00205 if (! channel_map.count( channel.get() ) ) {
00206 log(Error) <<"No such channel found in "<< getName() <<".channelReady( channel ): aborting connection."<<endlog();
00207 return false;
00208 }
00209 try {
00210 CChannelElement_ptr cce = channel_map[ channel.get() ];
00211 assert( cce );
00212 CConnPolicy cpolicy = toCORBA(policy);
00213 return dataflow->channelReady( this->getName().c_str(), cce, cpolicy );
00214 }
00215 catch(CORBA::Exception& e)
00216 {
00217 log(Error) <<"Remote call to "<< getName() <<".channelReady( channel ) failed with a CORBA exception: aborting connection."<<endlog();
00218 log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
00219 return false;
00220 }
00221 }
00222
00223 RemoteOutputPort::RemoteOutputPort(RTT::types::TypeInfo const* type_info,
00224 CDataFlowInterface_ptr dataflow, std::string const& reader_port,
00225 PortableServer::POA_ptr poa)
00226 : RemotePort< RTT::base::OutputPortInterface >(type_info, dataflow, reader_port, poa)
00227 {}
00228
00229 bool RemoteOutputPort::keepsLastWrittenValue() const
00230 { return false; }
00231
00232 void RemoteOutputPort::keepLastWrittenValue(bool new_flag)
00233 { throw std::runtime_error("OutputPort::keepLastWrittenValue() is not supported in CORBA port proxies"); }
00234
00235 DataSourceBase::shared_ptr RemoteOutputPort::getDataSource() const
00236 {
00237 return DataSourceBase::shared_ptr();
00238 }
00239
00240 bool RemoteOutputPort::createConnection( RTT::base::InputPortInterface& sink, RTT::ConnPolicy const& policy )
00241 {
00242 try {
00243 CConnPolicy cpolicy = toCORBA(policy);
00244
00245 RemoteInputPort* rip = dynamic_cast<RemoteInputPort*>(&sink);
00246 if ( rip ){
00247 CDataFlowInterface_var cdfi = rip->getDataFlowInterface();
00248 if ( dataflow->createConnection( this->getName().c_str(), cdfi.in() , sink.getName().c_str(), cpolicy ) ) {
00249 policy.name_id = cpolicy.name_id;
00250 return true;
00251 } else
00252 return false;
00253 }
00254
00255
00256 if(sink.getInterface() == 0){
00257 log(Error)<<"RemotePort connection is only possible if the local port '"<<sink.getName()<<"' is added to a DataFlowInterface. Use addPort for this."<<endlog();
00258 return false;
00259 }
00260 CDataFlowInterface_ptr cdfi = CDataFlowInterface_i::getRemoteInterface( sink.getInterface(), mpoa.in() );
00261 if ( dataflow->createConnection( this->getName().c_str(), cdfi , sink.getName().c_str(), cpolicy ) ) {
00262 policy.name_id = cpolicy.name_id;
00263 return true;
00264 }
00265 }
00266 catch(CORBA::Exception& e)
00267 {
00268 log(Error) <<"Remote call to "<< getName() <<".createConnection() failed with a CORBA exception: aborting connection."<<endlog();
00269 log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
00270 return false;
00271 }
00272 return false;
00273 }
00274
00275 RTT::base::PortInterface* RemoteOutputPort::clone() const
00276 { return type_info->outputPort(getName()); }
00277
00278 RTT::base::PortInterface* RemoteOutputPort::antiClone() const
00279 { return type_info->inputPort(getName()); }
00280