Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "RemotePorts.hpp"
00040 #include "CorbaTypeTransporter.hpp"
00041 #include "DataFlowI.h"
00042 #include "../../DataFlowInterface.hpp"
00043 #include <cassert>
00044 #include "CorbaConnPolicy.hpp"
00045 #include "CorbaLib.hpp"
00046 #include "RemoteConnID.hpp"
00047 #include "../../internal/ConnID.hpp"
00048 #include "../../rtt-detail-fwd.hpp"
00049
00050
00051 using namespace std;
00052 using namespace RTT::detail;
00053
00054 template<typename BaseClass>
00055 RemotePort<BaseClass>::RemotePort(RTT::types::TypeInfo const* type_info,
00056 CDataFlowInterface_ptr dataflow,
00057 std::string const& name,
00058 PortableServer::POA_ptr poa)
00059 : BaseClass(name)
00060 , type_info(type_info)
00061 , dataflow(CDataFlowInterface::_duplicate(dataflow))
00062 , mpoa(PortableServer::POA::_duplicate(poa)) { }
00063
00064 template<typename BaseClass>
00065 CDataFlowInterface_ptr RemotePort<BaseClass>::getDataFlowInterface() const
00066 { return CDataFlowInterface::_duplicate(dataflow); }
00067 template<typename BaseClass>
00068 RTT::types::TypeInfo const* RemotePort<BaseClass>::getTypeInfo() const { return type_info; }
00069 template<typename BaseClass>
00070 int RemotePort<BaseClass>::serverProtocol() const { return ORO_CORBA_PROTOCOL_ID; }
00071 template<typename BaseClass>
00072 bool RemotePort<BaseClass>::connected() const
00073 {
00074 return dataflow->isConnected(this->getName().c_str());
00075 }
00076 template<typename BaseClass>
00077 void RemotePort<BaseClass>::disconnect()
00078 {
00079 dataflow->disconnectPort(this->getName().c_str());
00080 }
00081 template<typename BaseClass>
00082 bool RemotePort<BaseClass>::disconnect(PortInterface* port)
00083 {
00084 Logger::In in("RemotePort::disconnect(PortInterface& port)");
00085 log(Error) << "Disconnecting a single port not yet supported." <<endlog();
00086 return false;
00087 }
00088 template<typename BaseClass>
00089 PortableServer::POA_ptr RemotePort<BaseClass>::_default_POA()
00090 { return PortableServer::POA::_duplicate(mpoa); }
00091
00092 template<typename BaseClass>
00093 RTT::internal::ConnID* RemotePort<BaseClass>::getPortID() const
00094 { return new RemoteConnID(dataflow, this->getName()); }
00095
00096 template<typename BaseClass>
00097 bool RemotePort<BaseClass>::createStream( const RTT::ConnPolicy& policy )
00098 {
00099 log(Error) << "Can't create a data stream on a remote port !" <<endlog();
00100 return false;
00101 }
00102
00103 template<typename BaseClass>
00104 bool RemotePort<BaseClass>::addConnection(RTT::internal::ConnID* port_id, ChannelElementBase::shared_ptr channel_input, RTT::ConnPolicy const& policy)
00105 {
00106 assert(false && "Can/Should not add connection to remote port object !");
00107 return false;
00108 }
00109
00110
00111 RemoteInputPort::RemoteInputPort(RTT::types::TypeInfo const* type_info,
00112 CDataFlowInterface_ptr dataflow, std::string const& reader_port,
00113 PortableServer::POA_ptr poa)
00114 : RemotePort< RTT::base::InputPortInterface >(type_info, dataflow, reader_port, poa)
00115 {}
00116
00117 RTT::base::DataSourceBase* RemoteInputPort::getDataSource()
00118 { throw std::runtime_error("InputPort::getDataSource() is not supported in CORBA port proxies"); }
00119
00120 RTT::base::ChannelElementBase::shared_ptr RemoteInputPort::buildRemoteChannelOutput(
00121 RTT::base::OutputPortInterface& output_port,
00122 RTT::types::TypeInfo const* type,
00123 RTT::base::InputPortInterface& reader_,
00124 RTT::ConnPolicy const& policy)
00125 {
00126
00127 Logger::In in("RemoteInputPort::buildRemoteChannelOutput");
00128
00129
00130
00131 CRemoteChannelElement_var remote;
00132 RTT::base::ChannelElementBase::shared_ptr buf;
00133 try {
00134 CConnPolicy cpolicy = toCORBA(policy);
00135 CChannelElement_var ret = dataflow->buildChannelOutput(getName().c_str(), cpolicy);
00136 if ( CORBA::is_nil(ret) ) {
00137 return 0;
00138 }
00139 remote = CRemoteChannelElement::_narrow( ret.in() );
00140 policy.name_id = toRTT(cpolicy).name_id;
00141 }
00142 catch(CORBA::Exception& e)
00143 {
00144 log(Error) << "Caught CORBA exception while creating a remote channel output:" << endlog();
00145 log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
00146 return NULL;
00147 }
00148
00149
00150
00151 CRemoteChannelElement_i* local =
00152 static_cast<CorbaTypeTransporter*>(type->getProtocol(ORO_CORBA_PROTOCOL_ID))
00153 ->createChannelElement_i(output_port.getInterface(), mpoa, policy.pull);
00154
00155 CRemoteChannelElement_var proxy = local->_this();
00156 local->setRemoteSide(remote);
00157 remote->setRemoteSide(proxy.in());
00158 local->_remove_ref();
00159
00160 RTT::base::ChannelElementBase::shared_ptr corba_ceb = dynamic_cast<RTT::base::ChannelElementBase*>(local);
00161
00162
00163
00164
00165
00166 if ( policy.transport != 0 && policy.transport != ORO_CORBA_PROTOCOL_ID ) {
00167
00168 string name = policy.name_id ;
00169 if ( type->getProtocol(policy.transport) == 0 ) {
00170 log(Error) << "Could not create out-of-band transport for port "<< name << " with transport id " << policy.transport <<endlog();
00171 log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
00172 }
00173 RTT::base::ChannelElementBase::shared_ptr ceb = type->getProtocol(policy.transport)->createStream(this, policy, true);
00174 if (ceb) {
00175
00176 ceb->setOutput( corba_ceb );
00177 corba_ceb = ceb;
00178 log(Info) <<"Redirecting data for port "<<name << " to out-of-band protocol "<< policy.transport << endlog();
00179 } else {
00180 log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a dual channel for port " << name<<endlog();
00181 }
00182 } else {
00183
00184 buf = type->buildDataStorage(policy);
00185 assert(buf);
00186 buf->setOutput( corba_ceb );
00187 corba_ceb = buf;
00188 }
00189
00190
00191
00192 channel_map[ corba_ceb->getOutputEndPoint().get() ] = CChannelElement::_duplicate( remote );
00193
00194 return corba_ceb;
00195 }
00196
00197 RTT::base::PortInterface* RemoteInputPort::clone() const
00198 { return type_info->inputPort(getName()); }
00199
00200 RTT::base::PortInterface* RemoteInputPort::antiClone() const
00201 { return type_info->outputPort(getName()); }
00202
00203
00204 bool RemoteInputPort::channelReady(RTT::base::ChannelElementBase::shared_ptr channel) {
00205 if (! channel_map.count( channel.get() ) ) {
00206 log(Error) <<"No such channel found in "<< getName() <<".channelReady( channel ): aborting connection."<<endlog();
00207 return false;
00208 }
00209 try {
00210 CChannelElement_ptr cce = channel_map[ channel.get() ];
00211 assert( cce );
00212 return dataflow->channelReady( this->getName().c_str(), cce );
00213 }
00214 catch(CORBA::Exception& e)
00215 {
00216 log(Error) <<"Remote call to "<< getName() <<".channelReady( channel ) failed with a CORBA exception: aborting connection."<<endlog();
00217 log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
00218 return false;
00219 }
00220 }
00221
00222 RemoteOutputPort::RemoteOutputPort(RTT::types::TypeInfo const* type_info,
00223 CDataFlowInterface_ptr dataflow, std::string const& reader_port,
00224 PortableServer::POA_ptr poa)
00225 : RemotePort< RTT::base::OutputPortInterface >(type_info, dataflow, reader_port, poa)
00226 {}
00227
00228 bool RemoteOutputPort::keepsLastWrittenValue() const
00229 { return false; }
00230
00231 void RemoteOutputPort::keepLastWrittenValue(bool new_flag)
00232 { throw std::runtime_error("OutputPort::keepLastWrittenValue() is not supported in CORBA port proxies"); }
00233
00234 DataSourceBase::shared_ptr RemoteOutputPort::getDataSource() const
00235 {
00236 return DataSourceBase::shared_ptr();
00237 }
00238
00239 bool RemoteOutputPort::createConnection( RTT::base::InputPortInterface& sink, RTT::ConnPolicy const& policy )
00240 {
00241 try {
00242 CConnPolicy cpolicy = toCORBA(policy);
00243
00244 RemoteInputPort* rip = dynamic_cast<RemoteInputPort*>(&sink);
00245 if ( rip ){
00246 CDataFlowInterface_var cdfi = rip->getDataFlowInterface();
00247 if ( dataflow->createConnection( this->getName().c_str(), cdfi.in() , sink.getName().c_str(), cpolicy ) ) {
00248 policy.name_id = cpolicy.name_id;
00249 return true;
00250 } else
00251 return false;
00252 }
00253
00254
00255 CDataFlowInterface_ptr cdfi = CDataFlowInterface_i::getRemoteInterface( sink.getInterface(), mpoa.in() );
00256 if ( dataflow->createConnection( this->getName().c_str(), cdfi , sink.getName().c_str(), cpolicy ) ) {
00257 policy.name_id = cpolicy.name_id;
00258 return true;
00259 }
00260 }
00261 catch(CORBA::Exception& e)
00262 {
00263 log(Error) <<"Remote call to "<< getName() <<".createConnection() failed with a CORBA exception: aborting connection."<<endlog();
00264 log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
00265 return false;
00266 }
00267 return false;
00268 }
00269
00270 RTT::base::PortInterface* RemoteOutputPort::clone() const
00271 { return type_info->outputPort(getName()); }
00272
00273 RTT::base::PortInterface* RemoteOutputPort::antiClone() const
00274 { return type_info->inputPort(getName()); }
00275