42 #include "../../DataFlowInterface.hpp" 47 #include "../../internal/ConnID.hpp" 48 #include "../../rtt-detail-fwd.hpp" 53 template<
typename BaseClass>
55 CDataFlowInterface_ptr dataflow,
56 std::string
const& name,
57 PortableServer::POA_ptr poa)
59 , type_info(type_info)
60 , dataflow(CDataFlowInterface::_duplicate(dataflow))
61 , mpoa(PortableServer::POA::_duplicate(poa)) { }
63 template<
typename BaseClass>
65 {
return CDataFlowInterface::_duplicate(
dataflow); }
66 template<
typename BaseClass>
68 template<
typename BaseClass>
70 template<
typename BaseClass>
73 return dataflow->isConnected(this->getName().c_str());
75 template<
typename BaseClass>
78 dataflow->disconnectPort(this->getName().c_str());
82 template<
typename BaseClass>
84 {
return PortableServer::POA::_duplicate(
mpoa); }
86 template<
typename BaseClass>
90 template<
typename BaseClass>
93 log(
Error) <<
"Can't create a data stream on a remote port !" <<
endlog();
97 template<
typename BaseClass>
100 assert(
false &&
"Can/Should not add connection to remote port object !");
104 template<
typename BaseClass>
111 CDataFlowInterface_ptr
dataflow, std::string
const& reader_port,
112 PortableServer::POA_ptr poa)
120 {
throw std::runtime_error(
"InputPort::getDataSource() is not supported in CORBA port proxies"); }
129 Logger::In in(
"RemoteInputPort::buildRemoteChannelOutput");
133 CRemoteChannelElement_var remote;
136 CConnPolicy cpolicy =
toCORBA(policy);
137 CChannelElement_var ret =
dataflow->buildChannelOutput(
getName().c_str(), cpolicy);
138 if ( CORBA::is_nil(ret) ) {
141 remote = CRemoteChannelElement::_narrow( ret.in() );
142 policy.
name_id = cpolicy.name_id;
145 catch(CORBA::Exception& e)
147 log(
Error) <<
"Caught CORBA exception while creating a remote channel output:" <<
endlog();
158 CRemoteChannelElement_var proxy = local->_this();
159 local->setRemoteSide(remote);
160 remote->setRemoteSide(proxy.in());
161 local->_remove_ref();
173 log(
Error) <<
"Could not create out-of-band transport for port "<< name <<
" with transport id " << policy.
transport <<
endlog();
174 log(
Error) <<
"No such transport registered. Check your policy.transport settings or add the transport for type "<< type->
getTypeName() <<
endlog();
179 ceb->connectTo( corba_ceb, policy.
mandatory );
181 log(
Info) <<
"Redirecting data for port "<<name <<
" to out-of-band protocol "<< policy.
transport <<
endlog();
183 log(
Error) <<
"The type transporter for type "<<type->
getTypeName()<<
" failed to create a dual channel for port " << name<<
endlog();
191 buf->connectTo( corba_ceb, policy.
mandatory );
202 Logger::In in(
"RemoteInputPort::createConnection");
205 CConnPolicy cpolicy =
toCORBA(policy);
206 cpolicy.name_id = CORBA::string_dup( shared_connection->getName().c_str() );
207 if (
dataflow->createSharedConnection( this->getName().c_str(), cpolicy ) ) {
208 policy.
name_id = cpolicy.name_id;
213 catch(CORBA::Exception& e)
215 log(
Error) <<
"Caught CORBA exception while trying to add an input port to an existing connection:" <<
endlog();
220 log(
Error) <<
"Failed to connect remote InputPort '" <<
getName() <<
"' to shared connection '" << shared_connection->getName() <<
"', " 221 <<
"most likely because you tried to connect input ports in different processes." <<
endlog();
239 CDataFlowInterface_ptr
dataflow, std::string
const& reader_port,
240 PortableServer::POA_ptr poa)
248 {
throw std::runtime_error(
"OutputPort::keepLastWrittenValue() is not supported in CORBA port proxies"); }
262 Logger::In in(
"RemoteOutputPort::disconnect(PortInterface& port)");
264 <<
" because it could not be casted to a RemoteInputPort type!" <<
nlog()
265 <<
"Only disconnect of two remote ports supported by corba layer, yet!" <<
endlog();
278 CConnPolicy cpolicy =
toCORBA(policy);
283 if (
dataflow->createConnection( this->getName().c_str(), cdfi.in() , sink.
getName().c_str(), cpolicy ) ) {
284 policy.
name_id = cpolicy.name_id;
293 log(
Error)<<
"RemotePort connection is only possible if the local port '"<<sink.
getName()<<
"' is added to a DataFlowInterface. Use addPort for this."<<
endlog();
297 if (
dataflow->createConnection( this->getName().c_str(), cdfi , sink.
getName().c_str(), cpolicy ) ) {
298 policy.
name_id = cpolicy.name_id;
303 catch(CORBA::Exception& e)
305 log(
Error) <<
"Remote call to "<<
getName() <<
".createConnection() failed with a CORBA exception: aborting connection."<<
endlog();
static CDataFlowInterface_ptr getRemoteInterface(DataFlowInterface *dfi, PortableServer::POA_ptr poa)
static Logger::LogFunction nlog()
bool createStream(const ConnPolicy &policy)
boost::intrusive_ptr< SharedConnectionBase > shared_ptr
virtual base::DataSourceBase::shared_ptr getDataSource() const
const std::string & getTypeName() const
The base class for all internal data representations.
CDataFlowInterface_var dataflow
types::TypeInfo const * type_info
base::PortInterface * antiClone() const
base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const &policy) const
#define CORBA_EXCEPTION_INFO(x)
const std::string & getName() const
bool keepsLastWrittenValue() const
virtual bool addConnection(internal::ConnID *port_id, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
types::TypeInfo const * getTypeInfo() const
DataFlowInterface * getInterface() const
virtual void disconnect()
void keepLastWrittenValue(bool new_flag)
virtual void disconnect()=0
PortableServer::POA_ptr _default_POA()
bool createConnection(base::InputPortInterface &sink, ConnPolicy const &policy)
base::ChannelElementBase * getEndpoint() const
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
internal::ConnID * getPortID() const
PortableServer::POA_var mpoa
RemoteOutputPort(types::TypeInfo const *type_info, CDataFlowInterface_ptr dataflow, std::string const &name, PortableServer::POA_ptr poa)
CDataFlowInterface_ptr getDataFlowInterface() const
boost::intrusive_ptr< ChannelElementBase > shared_ptr
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
TypeTransporter * getProtocol(int protocol_id) const
int serverProtocol() const
#define ORO_CORBA_PROTOCOL_ID
base::OutputPortInterface * outputPort(std::string const &name) const
base::PortInterface * clone() const
boost::intrusive_ptr< DataSourceBase > shared_ptr
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
base::InputPortInterface * inputPort(std::string const &name) const
static Logger::LogFunction endlog()