00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "RemotePorts.hpp"
00040 #include "CorbaTypeTransporter.hpp"
00041 #include "DataFlowI.h"
00042 #include "../../DataFlowInterface.hpp"
00043 #include <cassert>
00044 #include "CorbaConnPolicy.hpp"
00045 #include "CorbaLib.hpp"
00046 #include "RemoteConnID.hpp"
00047 #include "../../internal/ConnID.hpp"
00048 #include "../../rtt-detail-fwd.hpp"
00049
00050
00051 using namespace std;
00052 using namespace RTT::detail;
00053
00054 template<typename BaseClass>
00055 RemotePort<BaseClass>::RemotePort(RTT::types::TypeInfo const* type_info,
00056 CDataFlowInterface_ptr dataflow,
00057 std::string const& name,
00058 PortableServer::POA_ptr poa)
00059 : BaseClass(name)
00060 , type_info(type_info)
00061 , dataflow(CDataFlowInterface::_duplicate(dataflow))
00062 , mpoa(PortableServer::POA::_duplicate(poa)) { }
00063
00064 template<typename BaseClass>
00065 CDataFlowInterface_ptr RemotePort<BaseClass>::getDataFlowInterface() const
00066 { return CDataFlowInterface::_duplicate(dataflow); }
00067 template<typename BaseClass>
00068 RTT::types::TypeInfo const* RemotePort<BaseClass>::getTypeInfo() const { return type_info; }
00069 template<typename BaseClass>
00070 int RemotePort<BaseClass>::serverProtocol() const { return ORO_CORBA_PROTOCOL_ID; }
00071 template<typename BaseClass>
00072 bool RemotePort<BaseClass>::connected() const
00073 {
00074 return dataflow->isConnected(this->getName().c_str());
00075 }
00076 template<typename BaseClass>
00077 void RemotePort<BaseClass>::disconnect()
00078 {
00079 dataflow->disconnectPort(this->getName().c_str());
00080 }
00081 template<typename BaseClass>
00082 bool RemotePort<BaseClass>::disconnect(PortInterface* port)
00083 {
00084 Logger::In in("RemotePort::disconnect(PortInterface& port)");
00085 log(Error) << "Disconnecting a single port not yet supported." <<endlog();
00086 return false;
00087 }
00088 template<typename BaseClass>
00089 PortableServer::POA_ptr RemotePort<BaseClass>::_default_POA()
00090 { return PortableServer::POA::_duplicate(mpoa); }
00091
00092 template<typename BaseClass>
00093 RTT::internal::ConnID* RemotePort<BaseClass>::getPortID() const
00094 { return new RemoteConnID(dataflow, this->getName()); }
00095
00096 template<typename BaseClass>
00097 bool RemotePort<BaseClass>::createStream( const RTT::ConnPolicy& policy )
00098 {
00099 log(Error) << "Can't create a data stream on a remote port !" <<endlog();
00100 return false;
00101 }
00102
00103 template<typename BaseClass>
00104 bool RemotePort<BaseClass>::addConnection(RTT::internal::ConnID* port_id, ChannelElementBase::shared_ptr channel_input, RTT::ConnPolicy const& policy)
00105 {
00106 assert(false && "Can/Should not add connection to remote port object !");
00107 return false;
00108 }
00109
00110
00111 RemoteInputPort::RemoteInputPort(RTT::types::TypeInfo const* type_info,
00112 CDataFlowInterface_ptr dataflow, std::string const& reader_port,
00113 PortableServer::POA_ptr poa)
00114 : RemotePort< RTT::base::InputPortInterface >(type_info, dataflow, reader_port, poa)
00115 {}
00116
00117 RTT::base::DataSourceBase* RemoteInputPort::getDataSource()
00118 { throw std::runtime_error("InputPort::getDataSource() is not supported in CORBA port proxies"); }
00119
00120 RTT::base::ChannelElementBase::shared_ptr RemoteInputPort::buildRemoteChannelOutput(
00121 RTT::base::OutputPortInterface& output_port,
00122 RTT::types::TypeInfo const* type,
00123 RTT::base::InputPortInterface& reader_,
00124 RTT::ConnPolicy const& policy)
00125 {
00126
00127 Logger::In in("RemoteInputPort::buildRemoteChannelOutput");
00128
00129
00130
00131 CRemoteChannelElement_var remote;
00132 RTT::base::ChannelElementBase::shared_ptr buf;
00133 try {
00134 CConnPolicy cpolicy = toCORBA(policy);
00135 CChannelElement_var ret = dataflow->buildChannelOutput(getName().c_str(), cpolicy);
00136 if ( CORBA::is_nil(ret) ) {
00137 return 0;
00138 }
00139 remote = CRemoteChannelElement::_narrow( ret.in() );
00140 policy.name_id = toRTT(cpolicy).name_id;
00141 }
00142 catch(CORBA::Exception& e)
00143 {
00144 log(Error) << "Caught CORBA exception while creating a remote channel output:" << endlog();
00145 log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
00146 return NULL;
00147 }
00148
00149
00150
00151 CRemoteChannelElement_i* local =
00152 static_cast<CorbaTypeTransporter*>(type->getProtocol(ORO_CORBA_PROTOCOL_ID))
00153 ->createChannelElement_i(output_port.getInterface(), mpoa, policy.pull);
00154
00155 CRemoteChannelElement_var proxy = local->_this();
00156 local->setRemoteSide(remote);
00157 remote->setRemoteSide(proxy.in());
00158 local->_remove_ref();
00159
00160 RTT::base::ChannelElementBase::shared_ptr corba_ceb = dynamic_cast<RTT::base::ChannelElementBase*>(local);
00161
00162
00163
00164
00165
00166 if ( policy.transport != 0 && policy.transport != ORO_CORBA_PROTOCOL_ID ) {
00167
00168 string name = policy.name_id ;
00169 if ( type->getProtocol(policy.transport) == 0 ) {
00170 log(Error) << "Could not create out-of-band transport for port "<< name << " with transport id " << policy.transport <<endlog();
00171 log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
00172 }
00173 RTT::base::ChannelElementBase::shared_ptr ceb = type->getProtocol(policy.transport)->createStream(this, policy, true);
00174 if (ceb) {
00175
00176 ceb->setOutput( corba_ceb );
00177 corba_ceb = ceb;
00178 log(Info) <<"Redirecting data for port "<<name << " to out-of-band protocol "<< policy.transport << endlog();
00179 } else {
00180 log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a dual channel for port " << name<<endlog();
00181 }
00182 } else {
00183
00184 buf = type->buildDataStorage(policy);
00185 assert(buf);
00186 buf->setOutput( corba_ceb );
00187 corba_ceb = buf;
00188 }
00189
00190
00191
00192 channel_map[ corba_ceb->getOutputEndPoint().get() ] = CChannelElement::_duplicate( remote );
00193
00194 return corba_ceb;
00195 }
00196
00197 RTT::internal::ConnFactory* RemoteInputPort::getConnFactory() { return this; }
00198
00199 RTT::base::PortInterface* RemoteInputPort::clone() const
00200 { return type_info->inputPort(getName()); }
00201
00202 RTT::base::PortInterface* RemoteInputPort::antiClone() const
00203 { return type_info->outputPort(getName()); }
00204
00205
00206 bool RemoteInputPort::channelReady(RTT::base::ChannelElementBase::shared_ptr channel) {
00207 if (! channel_map.count( channel.get() ) ) {
00208 log(Error) <<"No such channel found in "<< getName() <<".channelReady( channel ): aborting connection."<<endlog();
00209 return false;
00210 }
00211 try {
00212 CChannelElement_ptr cce = channel_map[ channel.get() ];
00213 assert( cce );
00214 return dataflow->channelReady( this->getName().c_str(), cce );
00215 }
00216 catch(CORBA::Exception& e)
00217 {
00218 log(Error) <<"Remote call to "<< getName() <<".channelReady( channel ) failed with a CORBA exception: aborting connection."<<endlog();
00219 log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
00220 return false;
00221 }
00222 }
00223
00224 RemoteOutputPort::RemoteOutputPort(RTT::types::TypeInfo const* type_info,
00225 CDataFlowInterface_ptr dataflow, std::string const& reader_port,
00226 PortableServer::POA_ptr poa)
00227 : RemotePort< RTT::base::OutputPortInterface >(type_info, dataflow, reader_port, poa)
00228 {}
00229
00230 bool RemoteOutputPort::keepsLastWrittenValue() const
00231 { return false; }
00232
00233 void RemoteOutputPort::keepLastWrittenValue(bool new_flag)
00234 { throw std::runtime_error("OutputPort::keepLastWrittenValue() is not supported in CORBA port proxies"); }
00235
00236 DataSourceBase::shared_ptr RemoteOutputPort::getDataSource() const
00237 {
00238 return DataSourceBase::shared_ptr();
00239 }
00240
00241 bool RemoteOutputPort::createConnection( RTT::base::InputPortInterface& sink, RTT::ConnPolicy const& policy )
00242 {
00243 try {
00244 CConnPolicy cpolicy = toCORBA(policy);
00245
00246 RemoteInputPort* rip = dynamic_cast<RemoteInputPort*>(&sink);
00247 if ( rip ){
00248 CDataFlowInterface_var cdfi = rip->getDataFlowInterface();
00249 if ( dataflow->createConnection( this->getName().c_str(), cdfi.in() , sink.getName().c_str(), cpolicy ) ) {
00250 policy.name_id = cpolicy.name_id;
00251 return true;
00252 } else
00253 return false;
00254 }
00255
00256
00257 CDataFlowInterface_ptr cdfi = CDataFlowInterface_i::getRemoteInterface( sink.getInterface(), mpoa.in() );
00258 if ( dataflow->createConnection( this->getName().c_str(), cdfi , sink.getName().c_str(), cpolicy ) ) {
00259 policy.name_id = cpolicy.name_id;
00260 return true;
00261 }
00262 }
00263 catch(CORBA::Exception& e)
00264 {
00265 log(Error) <<"Remote call to "<< getName() <<".createConnection() failed with a CORBA exception: aborting connection."<<endlog();
00266 log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
00267 return false;
00268 }
00269 return false;
00270 }
00271
00272 RTT::base::PortInterface* RemoteOutputPort::clone() const
00273 { return type_info->outputPort(getName()); }
00274
00275 RTT::base::PortInterface* RemoteOutputPort::antiClone() const
00276 { return type_info->inputPort(getName()); }
00277