RemotePorts.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 RemotePorts.cpp
3 
4  RemotePorts.cpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #include "RemotePorts.hpp"
40 #include "CorbaTypeTransporter.hpp"
41 #include "DataFlowI.h"
42 #include "../../DataFlowInterface.hpp"
43 #include <cassert>
44 #include "CorbaConnPolicy.hpp"
45 #include "CorbaLib.hpp"
46 #include "RemoteConnID.hpp"
47 #include "../../internal/ConnID.hpp"
48 #include "../../rtt-detail-fwd.hpp"
49 
50 using namespace std;
51 using namespace RTT::detail;
52 
53 template<typename BaseClass>
55  CDataFlowInterface_ptr dataflow,
56  std::string const& name,
57  PortableServer::POA_ptr poa)
58  : BaseClass(name)
59  , type_info(type_info)
60  , dataflow(CDataFlowInterface::_duplicate(dataflow))
61  , mpoa(PortableServer::POA::_duplicate(poa)) { }
62 
63 template<typename BaseClass>
64 CDataFlowInterface_ptr RemotePort<BaseClass>::getDataFlowInterface() const
65 { return CDataFlowInterface::_duplicate(dataflow); }
66 template<typename BaseClass>
68 template<typename BaseClass>
70 template<typename BaseClass>
72 {
73  return dataflow->isConnected(this->getName().c_str());
74 }
75 template<typename BaseClass>
77 {
78  dataflow->disconnectPort(this->getName().c_str());
79 }
80 
81 
82 template<typename BaseClass>
83 PortableServer::POA_ptr RemotePort<BaseClass>::_default_POA()
84 { return PortableServer::POA::_duplicate(mpoa); }
85 
86 template<typename BaseClass>
88 { return new RemoteConnID(dataflow, this->getName()); }
89 
90 template<typename BaseClass>
92 {
93  log(Error) << "Can't create a data stream on a remote port !" <<endlog();
94  return false;
95 }
96 
97 template<typename BaseClass>
99 {
100  assert(false && "Can/Should not add connection to remote port object !");
101  return false;
102 }
103 
104 template<typename BaseClass>
106 {
107  return 0;
108 }
109 
111  CDataFlowInterface_ptr dataflow, std::string const& reader_port,
112  PortableServer::POA_ptr poa)
113  : RemotePort< RTT::base::InputPortInterface >(type_info, dataflow, reader_port, poa)
114 {}
115 
117 {}
118 
120 { throw std::runtime_error("InputPort::getDataSource() is not supported in CORBA port proxies"); }
121 
123  RTT::base::OutputPortInterface& output_port,
124  RTT::types::TypeInfo const* type,
126  RTT::ConnPolicy const& policy)
127 {
128  // This is called by the createConnection()->createRemoteConnection() code of the ConnFactory.
129  Logger::In in("RemoteInputPort::buildRemoteChannelOutput");
130 
131  // First we delegate this call to the remote side, which will create a corba channel element,
132  // buffers and channel output and attach this to the real input port.
133  CRemoteChannelElement_var remote;
135  try {
136  CConnPolicy cpolicy = toCORBA(policy);
137  CChannelElement_var ret = dataflow->buildChannelOutput(getName().c_str(), cpolicy);
138  if ( CORBA::is_nil(ret) ) {
139  return 0;
140  }
141  remote = CRemoteChannelElement::_narrow( ret.in() );
142  policy.name_id = cpolicy.name_id;
143  policy.data_size = cpolicy.data_size;
144  }
145  catch(CORBA::Exception& e)
146  {
147  log(Error) << "Caught CORBA exception while creating a remote channel output:" << endlog();
148  log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
149  return NULL;
150  }
151 
152  // Input side is now ok and waiting for us to complete. We build our corba channel element too
153  // and connect it to the remote side and vice versa.
154  CRemoteChannelElement_i* local =
156  ->createChannelElement_i(output_port.getInterface(), mpoa, policy);
157 
158  CRemoteChannelElement_var proxy = local->_this();
159  local->setRemoteSide(remote);
160  remote->setRemoteSide(proxy.in());
161  local->_remove_ref();
162 
164 
165  // Note: this probably needs to factored out, see also DataFlowI.cpp:buildChannelOutput() for the counterpart of this code.
166  // If the user specified OOB, we prepend the prefered transport.
167  // This inserts a channel element before our corba channel element.
168  // The remote input side will have done this too in the above step.
169  if ( policy.transport != 0 && policy.transport != ORO_CORBA_PROTOCOL_ID ) {
170  // create alternative path / out of band transport.
171  string name = policy.name_id ;
172  if ( type->getProtocol(policy.transport) == 0 ) {
173  log(Error) << "Could not create out-of-band transport for port "<< name << " with transport id " << policy.transport <<endlog();
174  log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
175  }
176  RTT::base::ChannelElementBase::shared_ptr ceb = type->getProtocol(policy.transport)->createStream(this, policy, /* is_sender = */ true);
177  if (ceb) {
178  // insertion before corba.
179  ceb->connectTo( corba_ceb, policy.mandatory );
180  corba_ceb = ceb;
181  log(Info) <<"Redirecting data for port "<<name << " to out-of-band protocol "<< policy.transport << endlog();
182  } else {
183  log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a dual channel for port " << name<<endlog();
184  }
185  } else {
186  // if no oob present, create a buffer at output port to guarantee RT delivery of data
187  // This is only needed for push connections. For pull, the buffer has already been created by ConnFactory<T>::buildChannelInput().
188  if (policy.pull == ConnPolicy::PUSH) {
189  buf = type->buildDataStorage(policy);
190  assert(buf);
191  buf->connectTo( corba_ceb, policy.mandatory );
192  corba_ceb = buf;
193  }
194  }
195 
196  // The ChannelElementBase object that represents reader_half on this side
197  return corba_ceb;
198 }
199 
201 {
202  Logger::In in("RemoteInputPort::createConnection");
203 
204  try {
205  CConnPolicy cpolicy = toCORBA(policy);
206  cpolicy.name_id = CORBA::string_dup( shared_connection->getName().c_str() );
207  if ( dataflow->createSharedConnection( this->getName().c_str(), cpolicy ) ) {
208  policy.name_id = cpolicy.name_id;
209  policy.data_size = cpolicy.data_size;
210  return true;
211  }
212  }
213  catch(CORBA::Exception& e)
214  {
215  log(Error) << "Caught CORBA exception while trying to add an input port to an existing connection:" << endlog();
216  log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
217  return false;
218  }
219 
220  log(Error) << "Failed to connect remote InputPort '" << getName() << "' to shared connection '" << shared_connection->getName() << "', "
221  << "most likely because you tried to connect input ports in different processes." << endlog();
222  return false;
223 }
224 
226 {
227  //just calling the implementation from the other side. (implemented already for RemoteOutputPort).
228  return port->disconnect(this);
229 }
230 
232 { return type_info->inputPort(getName()); }
233 
235 { return type_info->outputPort(getName()); }
236 
237 
239  CDataFlowInterface_ptr dataflow, std::string const& reader_port,
240  PortableServer::POA_ptr poa)
241  : RemotePort< RTT::base::OutputPortInterface >(type_info, dataflow, reader_port, poa)
242 {}
243 
245 { return false; }
246 
248 { throw std::runtime_error("OutputPort::keepLastWrittenValue() is not supported in CORBA port proxies"); }
249 
251 {
253 }
254 
255 
257 {
258  RemoteInputPort *portI = dynamic_cast<RemoteInputPort *>(port);
259 
260  //if not a remote port, we can not handle at the moment!
261  if(portI == NULL){
262  Logger::In in("RemoteOutputPort::disconnect(PortInterface& port)");
263  log(Error) << "Port: " << port->getName() << " could not be disconnected from: " << this->getName()
264  << " because it could not be casted to a RemoteInputPort type!" << nlog()
265  << "Only disconnect of two remote ports supported by corba layer, yet!" << endlog();
266  return false;
267 
268  }
269 
270  //if Remote Input Port:
271  return dataflow->removeConnection(this->getName().c_str(), portI->getDataFlowInterface(),
272  portI->getName().c_str());
273 }
274 
276 {
277  try {
278  CConnPolicy cpolicy = toCORBA(policy);
279  // first check if we're connecting to another remote:
280  RemoteInputPort* rip = dynamic_cast<RemoteInputPort*>(&sink);
281  if ( rip ){
282  CDataFlowInterface_var cdfi = rip->getDataFlowInterface();
283  if ( dataflow->createConnection( this->getName().c_str(), cdfi.in() , sink.getName().c_str(), cpolicy ) ) {
284  policy.name_id = cpolicy.name_id;
285  policy.data_size = cpolicy.data_size;
286  return true;
287  } else
288  return false;
289  }
290  // !!! only if sink is local:
291  // this dynamic CDataFlowInterface lookup is tricky, we re/ab-use the DataFlowInterface pointer of sink !
292  if(sink.getInterface() == 0){
293  log(Error)<<"RemotePort connection is only possible if the local port '"<<sink.getName()<<"' is added to a DataFlowInterface. Use addPort for this."<<endlog();
294  return false;
295  }
296  CDataFlowInterface_ptr cdfi = CDataFlowInterface_i::getRemoteInterface( sink.getInterface(), mpoa.in() );
297  if ( dataflow->createConnection( this->getName().c_str(), cdfi , sink.getName().c_str(), cpolicy ) ) {
298  policy.name_id = cpolicy.name_id;
299  policy.data_size = cpolicy.data_size;
300  return true;
301  }
302  }
303  catch(CORBA::Exception& e)
304  {
305  log(Error) <<"Remote call to "<< getName() <<".createConnection() failed with a CORBA exception: aborting connection."<<endlog();
306  log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
307  return false;
308  }
309  return false;
310 }
311 
313 { return type_info->outputPort(getName()); }
314 
316 { return type_info->inputPort(getName()); }
317 
base::ChannelElementBase::shared_ptr buildRemoteChannelOutput(base::OutputPortInterface &output_port, types::TypeInfo const *type, base::InputPortInterface &reader_, ConnPolicy const &policy)
static CDataFlowInterface_ptr getRemoteInterface(DataFlowInterface *dfi, PortableServer::POA_ptr poa)
Definition: DataFlowI.cpp:124
static Logger::LogFunction nlog()
Definition: Logger.hpp:375
bool createStream(const ConnPolicy &policy)
Definition: RemotePorts.cpp:91
boost::intrusive_ptr< SharedConnectionBase > shared_ptr
virtual base::DataSourceBase::shared_ptr getDataSource() const
const std::string & getTypeName() const
Definition: TypeInfo.hpp:83
The base class for all internal data representations.
CDataFlowInterface_var dataflow
Definition: RemotePorts.hpp:66
types::TypeInfo const * type_info
Definition: RemotePorts.hpp:65
bool connected() const
Definition: RemotePorts.cpp:71
base::PortInterface * antiClone() const
base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const &policy) const
Definition: TypeInfo.cpp:207
Definition: mystd.hpp:163
#define CORBA_EXCEPTION_INFO(x)
Definition: corba.h:70
const std::string & getName() const
bool keepsLastWrittenValue() const
virtual bool addConnection(internal::ConnID *port_id, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
Definition: RemotePorts.cpp:98
RemoteInputPort(types::TypeInfo const *type_info, CDataFlowInterface_ptr dataflow, std::string const &name, PortableServer::POA_ptr poa)
static const bool PUSH
Definition: ConnPolicy.hpp:119
types::TypeInfo const * getTypeInfo() const
Definition: RemotePorts.cpp:67
DataFlowInterface * getInterface() const
virtual void disconnect()
Definition: RemotePorts.cpp:76
void keepLastWrittenValue(bool new_flag)
virtual void disconnect()=0
bool createConnection(internal::SharedConnectionBase::shared_ptr shared_connection, ConnPolicy const &policy=ConnPolicy())
PortableServer::POA_ptr _default_POA()
Definition: RemotePorts.cpp:83
bool createConnection(base::InputPortInterface &sink, ConnPolicy const &policy)
base::DataSourceBase * getDataSource()
base::PortInterface * antiClone() const
base::ChannelElementBase * getEndpoint() const
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
internal::ConnID * getPortID() const
Definition: RemotePorts.cpp:87
PortableServer::POA_var mpoa
Definition: RemotePorts.hpp:67
RemoteOutputPort(types::TypeInfo const *type_info, CDataFlowInterface_ptr dataflow, std::string const &name, PortableServer::POA_ptr poa)
CDataFlowInterface_ptr getDataFlowInterface() const
Definition: RemotePorts.cpp:64
boost::intrusive_ptr< ChannelElementBase > shared_ptr
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
TypeTransporter * getProtocol(int protocol_id) const
Definition: TypeInfo.cpp:150
int serverProtocol() const
Definition: RemotePorts.cpp:69
#define ORO_CORBA_PROTOCOL_ID
Definition: CorbaLib.hpp:45
base::OutputPortInterface * outputPort(std::string const &name) const
Definition: TypeInfo.cpp:202
base::PortInterface * clone() const
base::PortInterface * clone() const
boost::intrusive_ptr< DataSourceBase > shared_ptr
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
std::string name_id
Definition: ConnPolicy.hpp:256
static Logger & log()
Definition: Logger.hpp:350
base::InputPortInterface * inputPort(std::string const &name) const
Definition: TypeInfo.cpp:197
static Logger::LogFunction endlog()
Definition: Logger.hpp:362


rtt
Author(s): RTT Developers
autogenerated on Tue Jun 25 2019 19:33:27