41 #include "DataFlowS.h" 43 #include "DataFlowC.h" 45 #include "../../base/PortInterface.hpp" 46 #include "../../Logger.hpp" 49 #include "../../InputPort.hpp" 50 #include "../../OutputPort.hpp" 69 : mdf(interface), mpoa(PortableServer::POA::_duplicate(poa))
92 if (it->getDataFlowInterface() == obj)
94 log(
Debug) <<
"deregistered servant for data flow interface" <<
endlog();
96 PortableServer::ObjectId_var oid = servant->
mpoa->servant_to_id(it->servant);
97 servant->
mpoa->deactivate_object(oid);
118 if (it->objref->_is_equivalent(objref))
119 return it->getDataFlowInterface();
129 if (it->getDataFlowInterface() == dfi)
133 CDataFlowInterface_ptr server = servant->_this();
134 servant->_remove_ref();
141 return PortableServer::POA::_duplicate(
mpoa);
144 CDataFlowInterface::CPortNames * CDataFlowInterface_i::getPorts()
ACE_THROW_SPEC ((
145 CORBA::SystemException
150 RTT::corba::CDataFlowInterface::CPortNames_var pn =
new RTT::corba::CDataFlowInterface::CPortNames();
151 pn->length( ports.size() );
153 for (
unsigned int i=0; i != ports.size(); ++i )
154 pn[i] = CORBA::string_dup( ports[i].c_str() );
159 CDataFlowInterface::CPortDescriptions* CDataFlowInterface_i::getPortDescriptions()
ACE_THROW_SPEC ((
160 CORBA::SystemException
164 RTT::corba::CDataFlowInterface::CPortDescriptions_var result =
new RTT::corba::CDataFlowInterface::CPortDescriptions();
165 result->length( ports.size() );
168 for (
unsigned int i = 0; i < ports.size(); ++i)
170 CPortDescription port_desc;
173 port_desc.
name = CORBA::string_dup(ports[i].c_str());
174 port_desc.description = CORBA::string_dup(port->
getDescription().c_str());
179 log(
Warning) <<
"the type of port " << ports[i] <<
" is not registered into the Orocos type system. It is ignored by the CORBA layer." <<
endlog();
183 port_desc.type_name = CORBA::string_dup(type_info->getTypeName().c_str());
184 if (dynamic_cast<InputPortInterface*>(port))
185 port_desc.type = corba::CInput;
187 port_desc.type = corba::COutput;
189 result[j++] = port_desc;
192 return result._retn();
195 CPortType CDataFlowInterface_i::getPortType(
const char * port_name)
ACE_THROW_SPEC ((
196 CORBA::SystemException
197 ,::RTT::corba::CNoSuchPortException
202 throw CNoSuchPortException();
204 if (dynamic_cast<InputPortInterface*>(p))
205 return RTT::corba::CInput;
206 else return RTT::corba::COutput;
209 char* CDataFlowInterface_i::getDataType(
const char * port_name)
ACE_THROW_SPEC ((
210 CORBA::SystemException
211 ,::RTT::corba::CNoSuchPortException
216 throw CNoSuchPortException();
220 CORBA::Boolean CDataFlowInterface_i::isConnected(
const char * port_name)
ACE_THROW_SPEC ((
221 CORBA::SystemException
222 ,::RTT::corba::CNoSuchPortException
227 throw corba::CNoSuchPortException();
236 if (it->first->_is_equivalent (channel) ) {
243 void CDataFlowInterface_i::disconnectPort(
const char * port_name)
ACE_THROW_SPEC ((
244 CORBA::SystemException
245 ,::RTT::corba::CNoSuchPortException
250 log(
Error) <<
"disconnectPort: No such port: "<< port_name <<
endlog();
251 throw corba::CNoSuchPortException();
257 bool CDataFlowInterface_i::removeConnection(
258 const char* local_port,
259 CDataFlowInterface_ptr remote_interface,
const char* remote_port)
ACE_THROW_SPEC ((
260 CORBA::SystemException
261 ,::RTT::corba::CNoSuchPortException
267 log(
Error) <<
"disconnectPort: No such port: "<< local_port <<
endlog();
268 throw corba::CNoSuchPortException();
270 if (dynamic_cast<OutputPortInterface*>(port) == 0) {
271 log(
Error) <<
"disconnectPort: "<< local_port <<
" is an input port" <<
endlog();
272 throw corba::CNoSuchPortException();
295 ::CORBA::Boolean CDataFlowInterface_i::createStream(
const char* port,
297 CORBA::SystemException
298 ,::RTT::corba::CNoSuchPortException
304 throw corba::CNoSuchPortException();
316 void CDataFlowInterface_i::removeStream(
const char* port,
const char* stream_name)
ACE_THROW_SPEC ((
317 CORBA::SystemException
318 ,::RTT::corba::CNoSuchPortException
324 throw corba::CNoSuchPortException();
332 CChannelElement_ptr CDataFlowInterface_i::buildChannelOutput(
333 const char* port_name, CConnPolicy & corba_policy)
ACE_THROW_SPEC ((
334 CORBA::SystemException
335 ,::RTT::corba::CNoCorbaTransport
336 ,::RTT::corba::CNoSuchPortException
337 ,::RTT::corba::CInvalidArgument
340 Logger::In in(
"CDataFlowInterface_i::buildChannelOutput");
343 throw CNoSuchPortException();
347 throw CNoCorbaTransport();
352 throw CNoCorbaTransport();
362 if (!shared_connection) {
363 throw CInvalidArgument();
367 if ( strlen( corba_policy.name_id.in()) == 0 )
368 corba_policy.name_id = CORBA::string_dup( shared_connection->getName().c_str() );
372 return RTT::corba::CChannelElement::_nil();
375 end = shared_connection;
380 if (!end)
throw CInvalidArgument();
387 throw CNoCorbaTransport();
396 if ( type_info->
getProtocol(corba_policy.transport) == 0 ) {
397 log(
Error) <<
"Could not create out-of-band transport for port "<< port_name <<
" with transport id " << corba_policy.transport <<
endlog();
398 log(
Error) <<
"No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->
getTypeName() <<
endlog();
399 return RTT::corba::CChannelElement::_nil();
403 if ( strlen( corba_policy.name_id.in()) == 0 )
404 corba_policy.name_id = CORBA::string_dup( policy2.
name_id.c_str() );
410 log(
Info) <<
"Receiving data for port "<< policy2.
name_id <<
" from out-of-band protocol "<< corba_policy.transport <<
endlog();
412 log(
Error) <<
"The type transporter for type "<<type_info->
getTypeName()<<
" failed to create an out-of-band endpoint for port " << port_name<<
endlog();
413 return RTT::corba::CChannelElement::_nil();
421 this_element->_remove_ref();
425 channel_list.push_back( ChannelList::value_type(this_element->_this(), end) );
428 CRemoteChannelElement_var proxy = this_element->_this();
429 return proxy._retn();
435 CChannelElement_ptr CDataFlowInterface_i::buildChannelInput(
436 const char* port_name, CConnPolicy & corba_policy)
ACE_THROW_SPEC ((
437 CORBA::SystemException
438 ,::RTT::corba::CNoCorbaTransport
439 ,::RTT::corba::CNoSuchPortException
440 ,::RTT::corba::CInvalidArgument
443 Logger::In in(
"CDataFlowInterface_i::buildChannelInput");
447 throw CNoSuchPortException();
451 throw CNoCorbaTransport();
456 throw CNoCorbaTransport();
465 throw CInvalidArgument();
473 throw CNoCorbaTransport();
475 assert( dynamic_cast<ChannelElementBase*>(this_element) );
483 if ( type_info->
getProtocol(corba_policy.transport) == 0 ) {
484 log(
Error) <<
"Could not create out-of-band transport for port "<< port_name <<
" with transport id " << corba_policy.transport <<
endlog();
485 log(
Error) <<
"No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->
getTypeName() <<
endlog();
486 throw CNoCorbaTransport();
490 if ( strlen( corba_policy.name_id.in()) == 0 )
491 corba_policy.name_id = CORBA::string_dup( policy2.
name_id.c_str() );
495 start->connectTo( dynamic_cast<ChannelElementBase*>(this_element), policy2.
mandatory );
497 log(
Info) <<
"Sending data from port "<< policy2.
name_id <<
" to out-of-band protocol "<< corba_policy.transport <<
endlog();
499 log(
Error) <<
"The type transporter for type "<<type_info->
getTypeName()<<
" failed to create an out-of-band endpoint for port " << port_name<<
endlog();
500 throw CNoCorbaTransport();
506 start->connectTo(buf, policy2.
mandatory);
507 buf->connectTo( dynamic_cast<ChannelElementBase*>(this_element) );
519 channel_list.push_back( ChannelList::value_type(this_element->_this(), start->getInputEndPoint()));
522 return this_element->_this();
525 ::CORBA::Boolean CDataFlowInterface_i::createSharedConnection(
const char* port_name, ::RTT::corba::CConnPolicy& corba_policy)
ACE_THROW_SPEC ((
526 CORBA::SystemException
527 ,::RTT::corba::CNoSuchPortException
528 ,::RTT::corba::CInvalidArgument
531 Logger::In in(
"CDataFlowInterface_i::createSharedConnection");
534 throw CNoSuchPortException();
538 throw CNoCorbaTransport();
540 if (corba_policy.buffer_policy != CShared) {
541 throw CInvalidArgument();
554 if ( strlen( corba_policy.name_id.in()) == 0 )
555 corba_policy.name_id = CORBA::string_dup( shared_connection->getName().c_str() );
561 ::CORBA::Boolean CDataFlowInterface_i::createConnection(
562 const char* writer_port, CDataFlowInterface_ptr reader_interface,
564 CORBA::SystemException
565 ,::RTT::corba::CNoSuchPortException
568 Logger::In in(
"CDataFlowInterface_i::createConnection");
571 throw CNoSuchPortException();
577 if (local_interface && policy.transport == 0)
583 log(
Warning) <<
"CORBA: createConnection() target is not an input port" <<
endlog();
584 throw CNoSuchPortException();
588 log(
Debug) <<
"CORBA: createConnection() is creating a LOCAL connection between " <<
589 writer_port <<
" and " << reader_port <<
endlog();
593 log(
Debug) <<
"CORBA: createConnection() is creating a REMOTE connection between " <<
594 writer_port <<
" and " << reader_port <<
endlog();
597 if (reader_interface->getPortType(reader_port) != corba::CInput) {
598 log(
Error) <<
"Could not create connection: " << reader_port <<
" is not an input port."<<
endlog();
599 throw CNoSuchPortException();
612 catch(CORBA::COMM_FAILURE&) {
throw; }
613 catch(CORBA::TRANSIENT&) {
throw; }
614 catch(...) {
throw; }
620 PortableServer::POA_ptr poa)
621 : transport(transport)
622 ,
mpoa(PortableServer::POA::_duplicate(poa))
629 return PortableServer::POA::_duplicate(
mpoa);
632 void CRemoteChannelElement_i::setRemoteSide(CRemoteChannelElement_ptr remote)
ACE_THROW_SPEC ((
633 CORBA::SystemException
636 this->
remote_side = RTT::corba::CRemoteChannelElement::_duplicate(remote);
static CDataFlowInterface_ptr getRemoteInterface(DataFlowInterface *dfi, PortableServer::POA_ptr poa)
virtual const types::TypeInfo * getTypeInfo() const =0
virtual ~CRemoteChannelElement_i()
boost::intrusive_ptr< SharedConnectionBase > shared_ptr
std::list< ServantInfo > ServantMap
base::ChannelElementBase::shared_ptr buildChannelOutput(base::InputPortInterface &port, ConnPolicy const &policy) const
const std::string & getTypeName() const
virtual CRemoteChannelElement_i * createChannelElement_i(DataFlowInterface *sender,::PortableServer::POA *poa, const ConnPolicy &policy) const =0
PortNames getPortNames() const
base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const &policy) const
void deregisterChannel(CChannelElement_ptr channel)
const std::string & getName() const
CRemoteChannelElement_i(corba::CorbaTypeTransporter const &transport, PortableServer::POA_ptr poa)
base::ChannelElementBase::shared_ptr buildChannelInput(base::OutputPortInterface &port, ConnPolicy const &policy) const
PortableServer::POA_var mpoa
RTT::ConnPolicy toRTT(RTT::corba::CConnPolicy const &corba_policy)
static void clearServants()
virtual void disconnect()=0
static void registerServant(CDataFlowInterface_ptr objref, CDataFlowInterface_i *servant)
virtual bool removeConnection(internal::ConnID *cid)
base::PortInterface * getPort(const std::string &name) const
DataFlowInterface * getDataFlowInterface() const
#define ACE_THROW_SPEC(x)
PortableServer::POA_ptr _default_POA()
const std::string & getDescription() const
bool createConnection(InputPortInterface &sink)
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
boost::intrusive_ptr< ChannelElementBase > shared_ptr
static void deregisterServant(DataFlowInterface *obj)
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
virtual ~CDataFlowInterface_i()
static ServantMap s_servant_map
std::vector< std::string > PortNames
TypeTransporter * getProtocol(int protocol_id) const
#define ORO_CORBA_PROTOCOL_ID
void setInterface(DataFlowInterface *iface)
CRemoteChannelElement_var remote_side
static DataFlowInterface * getLocalInterface(CDataFlowInterface_ptr objref)
CDataFlowInterface_i(DataFlowInterface *interface, PortableServer::POA_ptr poa)
virtual bool createStream(ConnPolicy const &policy)=0
#define CORBA_CHECK_THREAD()
virtual bool connected() const =0
virtual shared_ptr getOutputEndPoint()
internal::SharedConnectionBase::shared_ptr buildSharedConnection(base::OutputPortInterface *output_port, base::InputPortInterface *input_port, ConnPolicy const &policy) const
static bool findSharedConnection(base::OutputPortInterface *output_port, base::InputPortInterface *input_port, ConnPolicy const &policy, SharedConnectionBase::shared_ptr &shared_connection)
PortableServer::POA_ptr _default_POA()
static Logger::LogFunction endlog()
PortableServer::POA_var mpoa
RTT::os::Mutex channel_list_mtx
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
void setCDataFlowInterface(CDataFlowInterface_i *dataflow)