00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include "DataFlowI.h"
00039 #include "corba.h"
00040 #ifdef CORBA_IS_TAO
00041 #include "DataFlowS.h"
00042 #else
00043 #include "DataFlowC.h"
00044 #endif
00045 #include "../../base/PortInterface.hpp"
00046 #include "../../Logger.hpp"
00047 #include "TaskContextProxy.hpp"
00048 #include "CorbaTypeTransporter.hpp"
00049 #include "../../InputPort.hpp"
00050 #include "../../OutputPort.hpp"
00051 #include "CorbaConnPolicy.hpp"
00052 #include "CorbaLib.hpp"
00053
00054 #include "RemotePorts.hpp"
00055 #include "RemoteConnID.hpp"
00056 #include <rtt/os/MutexLock.hpp>
00057
00058 #include <iostream>
00059
00060 using namespace std;
00061 using namespace RTT::corba;
00062 using namespace RTT::base;
00063 using namespace RTT::types;
00064 using namespace RTT::internal;
00065
00066 CDataFlowInterface_i::ServantMap CDataFlowInterface_i::s_servant_map;
00067
00068 CDataFlowInterface_i::CDataFlowInterface_i (RTT::DataFlowInterface* interface, PortableServer::POA_ptr poa)
00069 : mdf(interface), mpoa(PortableServer::POA::_duplicate(poa))
00070 {
00071 }
00072
00073 CDataFlowInterface_i::~CDataFlowInterface_i ()
00074 {
00075 channel_list.clear();
00076 }
00077
00078 RTT::DataFlowInterface* CDataFlowInterface_i::getDataFlowInterface() const
00079 {
00080 return mdf;
00081 }
00082
00083 void CDataFlowInterface_i::registerServant(CDataFlowInterface_ptr objref, CDataFlowInterface_i* servant)
00084 {
00085 s_servant_map.push_back(ServantInfo(objref, servant));
00086 }
00087 void CDataFlowInterface_i::deregisterServant(RTT::DataFlowInterface* obj)
00088 {
00089 for (ServantMap::iterator it = s_servant_map.begin();
00090 it != s_servant_map.end(); ++it)
00091 {
00092 if (it->getDataFlowInterface() == obj)
00093 {
00094 log(Debug) << "deregistered servant for data flow interface" << endlog();
00095 CDataFlowInterface_i* servant = it->servant;
00096 PortableServer::ObjectId_var oid = servant->mpoa->servant_to_id(it->servant);
00097 servant->mpoa->deactivate_object(oid);
00098 s_servant_map.erase(it);
00099 return;
00100 }
00101 }
00102 }
00103
00104 void CDataFlowInterface_i::clearServants()
00105 {
00106 while (!s_servant_map.empty())
00107 {
00108 ServantMap::iterator it = s_servant_map.begin();
00109 deregisterServant(it->getDataFlowInterface());
00110 }
00111 }
00112
00113 RTT::DataFlowInterface* CDataFlowInterface_i::getLocalInterface(CDataFlowInterface_ptr objref)
00114 {
00115 for (ServantMap::const_iterator it = s_servant_map.begin();
00116 it != s_servant_map.end(); ++it)
00117 {
00118 if (it->objref->_is_equivalent(objref))
00119 return it->getDataFlowInterface();
00120 }
00121 return NULL;
00122 }
00123
00124 CDataFlowInterface_ptr CDataFlowInterface_i::getRemoteInterface(RTT::DataFlowInterface* dfi, PortableServer::POA_ptr poa)
00125 {
00126 for (ServantMap::const_iterator it = s_servant_map.begin();
00127 it != s_servant_map.end(); ++it)
00128 {
00129 if (it->getDataFlowInterface() == dfi)
00130 return it->objref;
00131 }
00132 CDataFlowInterface_i* servant = new CDataFlowInterface_i(dfi, poa );
00133 CDataFlowInterface_ptr server = servant->_this();
00134 servant->_remove_ref();
00135 registerServant( server, servant);
00136 return server;
00137 }
00138
00139 PortableServer::POA_ptr CDataFlowInterface_i::_default_POA()
00140 {
00141 return PortableServer::POA::_duplicate(mpoa);
00142 }
00143
00144 CDataFlowInterface::CPortNames * CDataFlowInterface_i::getPorts() ACE_THROW_SPEC ((
00145 CORBA::SystemException
00146 ))
00147 {
00148 ::RTT::DataFlowInterface::PortNames ports = mdf->getPortNames();
00149
00150 RTT::corba::CDataFlowInterface::CPortNames_var pn = new RTT::corba::CDataFlowInterface::CPortNames();
00151 pn->length( ports.size() );
00152
00153 for (unsigned int i=0; i != ports.size(); ++i )
00154 pn[i] = CORBA::string_dup( ports[i].c_str() );
00155
00156 return pn._retn();
00157 }
00158
00159 CDataFlowInterface::CPortDescriptions* CDataFlowInterface_i::getPortDescriptions() ACE_THROW_SPEC ((
00160 CORBA::SystemException
00161 ))
00162 {
00163 RTT::DataFlowInterface::PortNames ports = mdf->getPortNames();
00164 RTT::corba::CDataFlowInterface::CPortDescriptions_var result = new RTT::corba::CDataFlowInterface::CPortDescriptions();
00165 result->length( ports.size() );
00166
00167 unsigned int j = 0;
00168 for (unsigned int i = 0; i < ports.size(); ++i)
00169 {
00170 CPortDescription port_desc;
00171
00172 PortInterface* port = mdf->getPort(ports[i]);
00173 port_desc.name = CORBA::string_dup(ports[i].c_str());
00174
00175 TypeInfo const* type_info = port->getTypeInfo();
00176 if (!type_info || !type_info->getProtocol(ORO_CORBA_PROTOCOL_ID))
00177 {
00178 log(Warning) << "the type of port " << ports[i] << " is not registered into the Orocos type system. It is ignored by the CORBA layer." << endlog();
00179 continue;
00180 }
00181
00182 port_desc.type_name = CORBA::string_dup(type_info->getTypeName().c_str());
00183 if (dynamic_cast<InputPortInterface*>(port))
00184 port_desc.type = corba::CInput;
00185 else
00186 port_desc.type = corba::COutput;
00187
00188 result[j++] = port_desc;
00189 }
00190 result->length( j );
00191 return result._retn();
00192 }
00193
00194 CPortType CDataFlowInterface_i::getPortType(const char * port_name) ACE_THROW_SPEC ((
00195 CORBA::SystemException
00196 ,::RTT::corba::CNoSuchPortException
00197 ))
00198 {
00199 PortInterface* p = mdf->getPort(port_name);
00200 if (p == 0)
00201 throw CNoSuchPortException();
00202
00203 if (dynamic_cast<InputPortInterface*>(p))
00204 return RTT::corba::CInput;
00205 else return RTT::corba::COutput;
00206 }
00207
00208 char* CDataFlowInterface_i::getDataType(const char * port_name) ACE_THROW_SPEC ((
00209 CORBA::SystemException
00210 ,::RTT::corba::CNoSuchPortException
00211 ))
00212 {
00213 PortInterface* p = mdf->getPort(port_name);
00214 if ( p == 0)
00215 throw CNoSuchPortException();
00216 return CORBA::string_dup( p->getTypeInfo()->getTypeName().c_str() );
00217 }
00218
00219 CORBA::Boolean CDataFlowInterface_i::isConnected(const char * port_name) ACE_THROW_SPEC ((
00220 CORBA::SystemException
00221 ,::RTT::corba::CNoSuchPortException
00222 ))
00223 {
00224 PortInterface* p = mdf->getPort(port_name);
00225 if (p == 0)
00226 throw corba::CNoSuchPortException();
00227
00228 return p->connected();
00229 }
00230
00231 void CDataFlowInterface_i::deregisterChannel(CChannelElement_ptr channel)
00232 { RTT::os::MutexLock lock(channel_list_mtx);
00233 ChannelList::iterator it=channel_list.begin();
00234 for (; it != channel_list.end(); ++it) {
00235 if (it->first->_is_equivalent (channel) ) {
00236 channel_list.erase(it);
00237 return;
00238 }
00239 }
00240 }
00241
00242 CORBA::Boolean CDataFlowInterface_i::channelReady(const char * reader_port_name, CChannelElement_ptr channel) ACE_THROW_SPEC ((
00243 CORBA::SystemException
00244 ,::RTT::corba::CNoSuchPortException
00245 ))
00246 {
00247 PortInterface* p = mdf->getPort(reader_port_name);
00248 if (p == 0)
00249 throw corba::CNoSuchPortException();
00250
00251 InputPortInterface* ip = dynamic_cast<InputPortInterface*>(p);
00252 if (ip == 0)
00253 throw corba::CNoSuchPortException();
00254
00255 CORBA_CHECK_THREAD();
00256
00257
00258 { RTT::os::MutexLock lock(channel_list_mtx);
00259 ChannelList::iterator it=channel_list.begin();
00260 for (; it != channel_list.end(); ++it) {
00261 if (it->first->_is_equivalent (channel) ) {
00262 try {
00263 return ip->channelReady( it->second );
00264 }
00265 catch(std::exception const& e)
00266 {
00267 log(Error) << "call to channelReady threw " << e.what() << endlog();
00268 throw;
00269 }
00270 }
00271 }
00272 }
00273 log(Error) << "Invalid CORBA channel given for port " << reader_port_name << ": could not match it to a local C++ channel." <<endlog();
00274 return false;
00275 }
00276
00277 void CDataFlowInterface_i::disconnectPort(const char * port_name) ACE_THROW_SPEC ((
00278 CORBA::SystemException
00279 ,::RTT::corba::CNoSuchPortException
00280 ))
00281 {
00282 PortInterface* p = mdf->getPort(port_name);
00283 if (p == 0) {
00284 log(Error) << "disconnectPort: No such port: "<< port_name <<endlog();
00285 throw corba::CNoSuchPortException();
00286 }
00287 CORBA_CHECK_THREAD();
00288 p->disconnect();
00289 }
00290
00291 bool CDataFlowInterface_i::removeConnection(
00292 const char* local_port,
00293 CDataFlowInterface_ptr remote_interface, const char* remote_port) ACE_THROW_SPEC ((
00294 CORBA::SystemException
00295 ,::RTT::corba::CNoSuchPortException
00296 ))
00297 {
00298 PortInterface* port = mdf->getPort(local_port);
00299
00300 if (port == 0) {
00301 log(Error) << "disconnectPort: No such port: "<< local_port <<endlog();
00302 throw corba::CNoSuchPortException();
00303 }
00304 if (dynamic_cast<OutputPortInterface*>(port) == 0) {
00305 log(Error) << "disconnectPort: "<< local_port << " is an input port" << endlog();
00306 throw corba::CNoSuchPortException();
00307 }
00308
00309 RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(remote_interface);
00310 if (local_interface)
00311 {
00312 PortInterface* other_port = local_interface->getPort(remote_port);
00313 if (!other_port)
00314 return false;
00315
00316
00317
00318
00319 if (port->disconnect(other_port))
00320 return true;
00321
00322 }
00323
00324 CORBA_CHECK_THREAD();
00325 RemoteConnID rcid(remote_interface, remote_port);
00326 return port->removeConnection( &rcid );
00327 }
00328
00329 ::CORBA::Boolean CDataFlowInterface_i::createStream( const char* port,
00330 RTT::corba::CConnPolicy & policy) ACE_THROW_SPEC ((
00331 CORBA::SystemException
00332 ,::RTT::corba::CNoSuchPortException
00333 ))
00334 {
00335 PortInterface* p = mdf->getPort(port);
00336 if (p == 0) {
00337 log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
00338 throw corba::CNoSuchPortException();
00339 }
00340
00341 CORBA_CHECK_THREAD();
00342 RTT::ConnPolicy p2 = toRTT(policy);
00343 if ( p->createStream( p2 ) ) {
00344 policy = toCORBA(p2);
00345 return true;
00346 }
00347 return false;
00348 }
00349
00350 void CDataFlowInterface_i::removeStream( const char* port, const char* stream_name) ACE_THROW_SPEC ((
00351 CORBA::SystemException
00352 ,::RTT::corba::CNoSuchPortException
00353 ))
00354 {
00355 PortInterface* p = mdf->getPort(port);
00356 if (p == 0) {
00357 log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
00358 throw corba::CNoSuchPortException();
00359 }
00360 CORBA_CHECK_THREAD();
00361 StreamConnID rcid(stream_name);
00362 p->removeConnection( &rcid );
00363 }
00364
00365
00366 CChannelElement_ptr CDataFlowInterface_i::buildChannelOutput(
00367 const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
00368 CORBA::SystemException
00369 ,::RTT::corba::CNoCorbaTransport
00370 ,::RTT::corba::CNoSuchPortException
00371 ))
00372 {
00373 Logger::In in("CDataFlowInterface_i::buildChannelOutput");
00374 InputPortInterface* port = dynamic_cast<InputPortInterface*>(mdf->getPort(port_name));
00375 if (port == 0)
00376 throw CNoSuchPortException();
00377
00378 TypeInfo const* type_info = port->getTypeInfo();
00379 if (!type_info)
00380 throw CNoCorbaTransport();
00381
00382 CorbaTypeTransporter* transporter =
00383 dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
00384 if (!transporter)
00385 throw CNoCorbaTransport();
00386
00387 CORBA_CHECK_THREAD();
00388
00389 ConnPolicy policy2 = toRTT(corba_policy);
00390
00391 ChannelElementBase::shared_ptr end = type_info->buildChannelOutput(*port);
00392 CRemoteChannelElement_i* this_element =
00393 transporter->createChannelElement_i(mdf, mpoa, corba_policy.pull);
00394 this_element->setCDataFlowInterface(this);
00395
00396
00397
00398
00399 if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
00400
00401
00402 if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
00403 log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
00404 log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
00405 return RTT::corba::CChannelElement::_nil();
00406 }
00407 RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, false);
00408
00409 if ( strlen( corba_policy.name_id.in()) == 0 )
00410 corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
00411
00412 if (ceb) {
00413
00414 dynamic_cast<ChannelElementBase*>(this_element)->setOutput(ceb);
00415 ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00416 ceb->setOutput( buf );
00417 buf->setOutput(end);
00418 log(Info) <<"Receiving data for port "<< policy2.name_id << " from out-of-band protocol "<< corba_policy.transport <<endlog();
00419 } else {
00420 log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
00421 return RTT::corba::CChannelElement::_nil();
00422 }
00423
00424 } else {
00425
00426 if ( !corba_policy.pull ) {
00427 ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00428 dynamic_cast<ChannelElementBase*>(this_element)->setOutput(buf);
00429 buf->setOutput(end);
00430 } else {
00431 dynamic_cast<ChannelElementBase*>(this_element)->setOutput(end);
00432 }
00433 }
00434
00435 this_element->_remove_ref();
00436
00437
00438 { RTT::os::MutexLock lock(channel_list_mtx);
00439 channel_list.push_back( ChannelList::value_type(this_element->_this(), end->getOutputEndPoint()));
00440 }
00441
00442 CRemoteChannelElement_var proxy = this_element->_this();
00443 return proxy._retn();
00444 }
00445
00449 CChannelElement_ptr CDataFlowInterface_i::buildChannelInput(
00450 const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
00451 CORBA::SystemException
00452 ,::RTT::corba::CNoCorbaTransport
00453 ,::RTT::corba::CNoSuchPortException
00454 ))
00455 {
00456 Logger::In in("CDataFlowInterface_i::buildChannelInput");
00457
00458 OutputPortInterface* port = dynamic_cast<OutputPortInterface*>(mdf->getPort(port_name));
00459 if (port == 0)
00460 throw CNoSuchPortException();
00461
00462 TypeInfo const* type_info = port->getTypeInfo();
00463 if (!type_info)
00464 throw CNoCorbaTransport();
00465
00466 CorbaTypeTransporter* transporter =
00467 dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
00468 if (!transporter)
00469 throw CNoCorbaTransport();
00470
00471 CORBA_CHECK_THREAD();
00472
00473 ConnPolicy policy2 = toRTT(corba_policy);
00474
00475
00476 ChannelElementBase::shared_ptr start = type_info->buildChannelInput(*port);
00477
00478
00479 CRemoteChannelElement_i* this_element;
00480 PortableServer::ServantBase_var servant = this_element = transporter->createChannelElement_i(mdf, mpoa, corba_policy.pull);
00481 this_element->setCDataFlowInterface(this);
00482
00483
00484 assert( dynamic_cast<ChannelElementBase*>(this_element) );
00485 start->getOutputEndPoint()->setOutput( dynamic_cast<ChannelElementBase*>(this_element));
00486
00487
00488
00489
00490 if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
00491
00492
00493 if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
00494 log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
00495 log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
00496 throw CNoCorbaTransport();
00497 }
00498 RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, true);
00499
00500 if ( strlen( corba_policy.name_id.in()) == 0 )
00501 corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
00502
00503 if (ceb) {
00504
00505 start->getOutputEndPoint()->setOutput( ceb );
00506 log(Info) <<"Sending data from port "<< policy2.name_id << " to out-of-band protocol "<< corba_policy.transport <<endlog();
00507 } else {
00508 log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
00509 throw CNoCorbaTransport();
00510 }
00511 } else {
00512
00513 ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00514 start->setOutput(buf);
00515 buf->setOutput( dynamic_cast<ChannelElementBase*>(this_element) );
00516 }
00517
00518
00519
00520 port->addConnection( new SimpleConnID(), start->getInputEndPoint(), policy2);
00521
00522
00523
00524
00525
00526 { RTT::os::MutexLock lock(channel_list_mtx);
00527 channel_list.push_back( ChannelList::value_type(this_element->_this(), start->getInputEndPoint()));
00528 }
00529
00530 return this_element->_this();
00531 }
00532
00533
00534 ::CORBA::Boolean CDataFlowInterface_i::createConnection(
00535 const char* writer_port, CDataFlowInterface_ptr reader_interface,
00536 const char* reader_port, CConnPolicy & policy) ACE_THROW_SPEC ((
00537 CORBA::SystemException
00538 ,::RTT::corba::CNoSuchPortException
00539 ))
00540 {
00541 Logger::In in("CDataFlowInterface_i::createConnection");
00542 OutputPortInterface* writer = dynamic_cast<OutputPortInterface*>(mdf->getPort(writer_port));
00543 if (writer == 0)
00544 throw CNoSuchPortException();
00545
00546 CORBA_CHECK_THREAD();
00547
00548
00549 RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(reader_interface);
00550 if (local_interface && policy.transport == 0)
00551 {
00552 InputPortInterface* reader =
00553 dynamic_cast<InputPortInterface*>(local_interface->getPort(reader_port));
00554 if (!reader)
00555 {
00556 log(Warning) << "CORBA: createConnection() target is not an input port" << endlog();
00557 throw CNoSuchPortException();
00558 return false;
00559 }
00560
00561 log(Debug) << "CORBA: createConnection() is creating a LOCAL connection between " <<
00562 writer_port << " and " << reader_port << endlog();
00563 return writer->createConnection(*reader, toRTT(policy));
00564 }
00565 else
00566 log(Debug) << "CORBA: createConnection() is creating a REMOTE connection between " <<
00567 writer_port << " and " << reader_port << endlog();
00568
00569 try {
00570 if (reader_interface->getPortType(reader_port) != corba::CInput) {
00571 log(Error) << "Could not create connection: " << reader_port <<" is not an input port."<<endlog();
00572 throw CNoSuchPortException();
00573 return false;
00574 }
00575
00576
00577 RemoteInputPort port(writer->getTypeInfo(), reader_interface, reader_port, mpoa);
00578 port.setInterface( mdf );
00579
00580 return writer->createConnection(port, toRTT(policy));
00581 }
00582 catch(CORBA::COMM_FAILURE&) { throw; }
00583 catch(CORBA::TRANSIENT&) { throw; }
00584 catch(...) { throw; }
00585 return false;
00586 }
00587
00588
00589 CRemoteChannelElement_i::CRemoteChannelElement_i(RTT::corba::CorbaTypeTransporter const& transport,
00590 PortableServer::POA_ptr poa)
00591 : transport(transport)
00592 , mpoa(PortableServer::POA::_duplicate(poa))
00593 , mdataflow(0)
00594 { }
00595 CRemoteChannelElement_i::~CRemoteChannelElement_i() {}
00596 PortableServer::POA_ptr CRemoteChannelElement_i::_default_POA()
00597 { return PortableServer::POA::_duplicate(mpoa); }
00598 void CRemoteChannelElement_i::setRemoteSide(CRemoteChannelElement_ptr remote) ACE_THROW_SPEC ((
00599 CORBA::SystemException
00600 ))
00601 { this->remote_side = RTT::corba::CRemoteChannelElement::_duplicate(remote); }
00602