$search
00001 /*************************************************************************** 00002 tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 RemotePorts.cpp 00003 00004 RemotePorts.cpp - description 00005 ------------------- 00006 begin : Thu October 22 2009 00007 copyright : (C) 2009 Peter Soetens 00008 email : peter@thesourcworks.com 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #include "RemotePorts.hpp" 00040 #include "CorbaTypeTransporter.hpp" 00041 #include "DataFlowI.h" 00042 #include "../../DataFlowInterface.hpp" 00043 #include <cassert> 00044 #include "CorbaConnPolicy.hpp" 00045 #include "CorbaLib.hpp" 00046 #include "RemoteConnID.hpp" 00047 #include "../../internal/ConnID.hpp" 00048 #include "../../rtt-detail-fwd.hpp" 00049 00050 00051 using namespace std; 00052 using namespace RTT::detail; 00053 00054 template<typename BaseClass> 00055 RemotePort<BaseClass>::RemotePort(RTT::types::TypeInfo const* type_info, 00056 CDataFlowInterface_ptr dataflow, 00057 std::string const& name, 00058 PortableServer::POA_ptr poa) 00059 : BaseClass(name) 00060 , type_info(type_info) 00061 , dataflow(CDataFlowInterface::_duplicate(dataflow)) 00062 , mpoa(PortableServer::POA::_duplicate(poa)) { } 00063 00064 template<typename BaseClass> 00065 CDataFlowInterface_ptr RemotePort<BaseClass>::getDataFlowInterface() const 00066 { return CDataFlowInterface::_duplicate(dataflow); } 00067 template<typename BaseClass> 00068 RTT::types::TypeInfo const* RemotePort<BaseClass>::getTypeInfo() const { return type_info; } 00069 template<typename BaseClass> 00070 int RemotePort<BaseClass>::serverProtocol() const { return ORO_CORBA_PROTOCOL_ID; } 00071 template<typename BaseClass> 00072 bool RemotePort<BaseClass>::connected() const 00073 { 00074 return dataflow->isConnected(this->getName().c_str()); 00075 } 00076 template<typename BaseClass> 00077 void RemotePort<BaseClass>::disconnect() 00078 { 00079 dataflow->disconnectPort(this->getName().c_str()); 00080 } 00081 template<typename BaseClass> 00082 bool RemotePort<BaseClass>::disconnect(PortInterface* port) 00083 { 00084 Logger::In in("RemotePort::disconnect(PortInterface& port)"); 00085 log(Error) << "Disconnecting a single port not yet supported." <<endlog(); 00086 return false; 00087 } 00088 template<typename BaseClass> 00089 PortableServer::POA_ptr RemotePort<BaseClass>::_default_POA() 00090 { return PortableServer::POA::_duplicate(mpoa); } 00091 00092 template<typename BaseClass> 00093 RTT::internal::ConnID* RemotePort<BaseClass>::getPortID() const 00094 { return new RemoteConnID(dataflow, this->getName()); } 00095 00096 template<typename BaseClass> 00097 bool RemotePort<BaseClass>::createStream( const RTT::ConnPolicy& policy ) 00098 { 00099 log(Error) << "Can't create a data stream on a remote port !" <<endlog(); 00100 return false; 00101 } 00102 00103 template<typename BaseClass> 00104 bool RemotePort<BaseClass>::addConnection(RTT::internal::ConnID* port_id, ChannelElementBase::shared_ptr channel_input, RTT::ConnPolicy const& policy) 00105 { 00106 assert(false && "Can/Should not add connection to remote port object !"); 00107 return false; 00108 } 00109 00110 00111 RemoteInputPort::RemoteInputPort(RTT::types::TypeInfo const* type_info, 00112 CDataFlowInterface_ptr dataflow, std::string const& reader_port, 00113 PortableServer::POA_ptr poa) 00114 : RemotePort< RTT::base::InputPortInterface >(type_info, dataflow, reader_port, poa) 00115 {} 00116 00117 RTT::base::DataSourceBase* RemoteInputPort::getDataSource() 00118 { throw std::runtime_error("InputPort::getDataSource() is not supported in CORBA port proxies"); } 00119 00120 RTT::base::ChannelElementBase::shared_ptr RemoteInputPort::buildRemoteChannelOutput( 00121 RTT::base::OutputPortInterface& output_port, 00122 RTT::types::TypeInfo const* type, 00123 RTT::base::InputPortInterface& reader_, 00124 RTT::ConnPolicy const& policy) 00125 { 00126 // This is called by the createConnection()->createRemoteConnection() code of the ConnFactory. 00127 Logger::In in("RemoteInputPort::buildRemoteChannelOutput"); 00128 00129 // First we delegate this call to the remote side, which will create a corba channel element, 00130 // buffers and channel output and attach this to the real input port. 00131 CRemoteChannelElement_var remote; 00132 RTT::base::ChannelElementBase::shared_ptr buf; 00133 try { 00134 CConnPolicy cpolicy = toCORBA(policy); 00135 CChannelElement_var ret = dataflow->buildChannelOutput(getName().c_str(), cpolicy); 00136 if ( CORBA::is_nil(ret) ) { 00137 return 0; 00138 } 00139 remote = CRemoteChannelElement::_narrow( ret.in() ); 00140 policy.name_id = toRTT(cpolicy).name_id; 00141 } 00142 catch(CORBA::Exception& e) 00143 { 00144 log(Error) << "Caught CORBA exception while creating a remote channel output:" << endlog(); 00145 log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog(); 00146 return NULL; 00147 } 00148 00149 // Input side is now ok and waiting for us to complete. We build our corba channel element too 00150 // and connect it to the remote side and vice versa. 00151 CRemoteChannelElement_i* local = 00152 static_cast<CorbaTypeTransporter*>(type->getProtocol(ORO_CORBA_PROTOCOL_ID)) 00153 ->createChannelElement_i(output_port.getInterface(), mpoa, policy.pull); 00154 00155 CRemoteChannelElement_var proxy = local->_this(); 00156 local->setRemoteSide(remote); 00157 remote->setRemoteSide(proxy.in()); 00158 local->_remove_ref(); 00159 00160 RTT::base::ChannelElementBase::shared_ptr corba_ceb = dynamic_cast<RTT::base::ChannelElementBase*>(local); 00161 00162 // Note: this probably needs to factored out, see also DataFlowI.cpp:buildChannelOutput() for the counterpart of this code. 00163 // If the user specified OOB, we prepend the prefered transport. 00164 // This inserts a channel element before our corba channel element. 00165 // The remote input side will have done this too in the above step. 00166 if ( policy.transport != 0 && policy.transport != ORO_CORBA_PROTOCOL_ID ) { 00167 // create alternative path / out of band transport. 00168 string name = policy.name_id ; 00169 if ( type->getProtocol(policy.transport) == 0 ) { 00170 log(Error) << "Could not create out-of-band transport for port "<< name << " with transport id " << policy.transport <<endlog(); 00171 log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog(); 00172 } 00173 RTT::base::ChannelElementBase::shared_ptr ceb = type->getProtocol(policy.transport)->createStream(this, policy, true); 00174 if (ceb) { 00175 // insertion before corba. 00176 ceb->setOutput( corba_ceb ); 00177 corba_ceb = ceb; 00178 log(Info) <<"Redirecting data for port "<<name << " to out-of-band protocol "<< policy.transport << endlog(); 00179 } else { 00180 log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a dual channel for port " << name<<endlog(); 00181 } 00182 } else { 00183 // if no oob present, create a buffer at output port to guarantee RT delivery of data. (is always present in push&pull). 00184 buf = type->buildDataStorage(policy); 00185 assert(buf); 00186 buf->setOutput( corba_ceb ); 00187 corba_ceb = buf; 00188 } 00189 // store the object reference in a map, for future lookup in channelReady(). 00190 // this is coupled with the use of channelReady(). We assume the caller will always pass 00191 // chan->getOutputEndPoint() in that function. 00192 channel_map[ corba_ceb->getOutputEndPoint().get() ] = CChannelElement::_duplicate( remote ); 00193 // The ChannelElementBase object that represents reader_half on this side 00194 return corba_ceb; 00195 } 00196 00197 RTT::base::PortInterface* RemoteInputPort::clone() const 00198 { return type_info->inputPort(getName()); } 00199 00200 RTT::base::PortInterface* RemoteInputPort::antiClone() const 00201 { return type_info->outputPort(getName()); } 00202 00203 00204 bool RemoteInputPort::channelReady(RTT::base::ChannelElementBase::shared_ptr channel) { 00205 if (! channel_map.count( channel.get() ) ) { 00206 log(Error) <<"No such channel found in "<< getName() <<".channelReady( channel ): aborting connection."<<endlog(); 00207 return false; 00208 } 00209 try { 00210 CChannelElement_ptr cce = channel_map[ channel.get() ]; 00211 assert( cce ); 00212 return dataflow->channelReady( this->getName().c_str(), cce ); 00213 } 00214 catch(CORBA::Exception& e) 00215 { 00216 log(Error) <<"Remote call to "<< getName() <<".channelReady( channel ) failed with a CORBA exception: aborting connection."<<endlog(); 00217 log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog(); 00218 return false; 00219 } 00220 } 00221 00222 RemoteOutputPort::RemoteOutputPort(RTT::types::TypeInfo const* type_info, 00223 CDataFlowInterface_ptr dataflow, std::string const& reader_port, 00224 PortableServer::POA_ptr poa) 00225 : RemotePort< RTT::base::OutputPortInterface >(type_info, dataflow, reader_port, poa) 00226 {} 00227 00228 bool RemoteOutputPort::keepsLastWrittenValue() const 00229 { return false; } 00230 00231 void RemoteOutputPort::keepLastWrittenValue(bool new_flag) 00232 { throw std::runtime_error("OutputPort::keepLastWrittenValue() is not supported in CORBA port proxies"); } 00233 00234 DataSourceBase::shared_ptr RemoteOutputPort::getDataSource() const 00235 { 00236 return DataSourceBase::shared_ptr(); 00237 } 00238 00239 bool RemoteOutputPort::createConnection( RTT::base::InputPortInterface& sink, RTT::ConnPolicy const& policy ) 00240 { 00241 try { 00242 CConnPolicy cpolicy = toCORBA(policy); 00243 // first check if we're connecting to another remote: 00244 RemoteInputPort* rip = dynamic_cast<RemoteInputPort*>(&sink); 00245 if ( rip ){ 00246 CDataFlowInterface_var cdfi = rip->getDataFlowInterface(); 00247 if ( dataflow->createConnection( this->getName().c_str(), cdfi.in() , sink.getName().c_str(), cpolicy ) ) { 00248 policy.name_id = cpolicy.name_id; 00249 return true; 00250 } else 00251 return false; 00252 } 00253 // !!! only if sink is local: 00254 // this dynamic CDataFlowInterface lookup is tricky, we re/ab-use the DataFlowInterface pointer of sink ! 00255 CDataFlowInterface_ptr cdfi = CDataFlowInterface_i::getRemoteInterface( sink.getInterface(), mpoa.in() ); 00256 if ( dataflow->createConnection( this->getName().c_str(), cdfi , sink.getName().c_str(), cpolicy ) ) { 00257 policy.name_id = cpolicy.name_id; 00258 return true; 00259 } 00260 } 00261 catch(CORBA::Exception& e) 00262 { 00263 log(Error) <<"Remote call to "<< getName() <<".createConnection() failed with a CORBA exception: aborting connection."<<endlog(); 00264 log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog(); 00265 return false; 00266 } 00267 return false; 00268 } 00269 00270 RTT::base::PortInterface* RemoteOutputPort::clone() const 00271 { return type_info->outputPort(getName()); } 00272 00273 RTT::base::PortInterface* RemoteOutputPort::antiClone() const 00274 { return type_info->inputPort(getName()); } 00275