00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include "DataFlowI.h"
00039 #include "corba.h"
00040 #ifdef CORBA_IS_TAO
00041 #include "DataFlowS.h"
00042 #else
00043 #include "DataFlowC.h"
00044 #endif
00045 #include "../../base/PortInterface.hpp"
00046 #include "../../Logger.hpp"
00047 #include "TaskContextProxy.hpp"
00048 #include "CorbaTypeTransporter.hpp"
00049 #include "../../InputPort.hpp"
00050 #include "../../OutputPort.hpp"
00051 #include "CorbaConnPolicy.hpp"
00052 #include "CorbaLib.hpp"
00053
00054 #include "RemotePorts.hpp"
00055 #include "RemoteConnID.hpp"
00056 #include <rtt/os/MutexLock.hpp>
00057
00058 #include <iostream>
00059
00060 using namespace std;
00061 using namespace RTT::corba;
00062 using namespace RTT::base;
00063 using namespace RTT::types;
00064 using namespace RTT::internal;
00065
00066 CDataFlowInterface_i::ServantMap CDataFlowInterface_i::s_servant_map;
00067
00068 CDataFlowInterface_i::CDataFlowInterface_i (RTT::DataFlowInterface* interface, PortableServer::POA_ptr poa)
00069 : mdf(interface), mpoa(PortableServer::POA::_duplicate(poa))
00070 {
00071 }
00072
00073 CDataFlowInterface_i::~CDataFlowInterface_i ()
00074 {
00075 channel_list.clear();
00076 }
00077
00078 void CDataFlowInterface_i::registerServant(CDataFlowInterface_ptr objref, RTT::DataFlowInterface* obj)
00079 {
00080 s_servant_map.push_back(
00081 std::make_pair(
00082 CDataFlowInterface_var(objref),
00083 obj)
00084 );
00085 }
00086 void CDataFlowInterface_i::deregisterServant(RTT::DataFlowInterface* obj)
00087 {
00088 for (ServantMap::iterator it = s_servant_map.begin();
00089 it != s_servant_map.end(); ++it)
00090 {
00091 if (it->second == obj)
00092 {
00093 s_servant_map.erase(it);
00094 return;
00095 }
00096 }
00097 }
00098
00099 void CDataFlowInterface_i::clearServants()
00100 {
00101 s_servant_map.clear();
00102 }
00103
00104 RTT::DataFlowInterface* CDataFlowInterface_i::getLocalInterface(CDataFlowInterface_ptr objref)
00105 {
00106 for (ServantMap::const_iterator it = s_servant_map.begin();
00107 it != s_servant_map.end(); ++it)
00108 {
00109 if (it->first->_is_equivalent(objref))
00110 return it->second;
00111 }
00112 return NULL;
00113 }
00114
00115 CDataFlowInterface_ptr CDataFlowInterface_i::getRemoteInterface(RTT::DataFlowInterface* dfi, PortableServer::POA_ptr poa)
00116 {
00117 for (ServantMap::const_iterator it = s_servant_map.begin();
00118 it != s_servant_map.end(); ++it)
00119 {
00120 if (it->second == dfi)
00121 return it->first;
00122 }
00123 CDataFlowInterface_i* servant = new CDataFlowInterface_i(dfi, poa );
00124 CDataFlowInterface_ptr server = servant->_this();
00125 servant->_remove_ref();
00126 registerServant( server, dfi);
00127 return server;
00128 }
00129
00130 PortableServer::POA_ptr CDataFlowInterface_i::_default_POA()
00131 {
00132 return PortableServer::POA::_duplicate(mpoa);
00133 }
00134
00135 CDataFlowInterface::CPortNames * CDataFlowInterface_i::getPorts() ACE_THROW_SPEC ((
00136 CORBA::SystemException
00137 ))
00138 {
00139 ::RTT::DataFlowInterface::PortNames ports = mdf->getPortNames();
00140
00141 RTT::corba::CDataFlowInterface::CPortNames_var pn = new RTT::corba::CDataFlowInterface::CPortNames();
00142 pn->length( ports.size() );
00143
00144 for (unsigned int i=0; i != ports.size(); ++i )
00145 pn[i] = CORBA::string_dup( ports[i].c_str() );
00146
00147 return pn._retn();
00148 }
00149
00150 CDataFlowInterface::CPortDescriptions* CDataFlowInterface_i::getPortDescriptions() ACE_THROW_SPEC ((
00151 CORBA::SystemException
00152 ))
00153 {
00154 RTT::DataFlowInterface::PortNames ports = mdf->getPortNames();
00155 RTT::corba::CDataFlowInterface::CPortDescriptions_var result = new RTT::corba::CDataFlowInterface::CPortDescriptions();
00156 result->length( ports.size() );
00157
00158 unsigned int j = 0;
00159 for (unsigned int i = 0; i < ports.size(); ++i)
00160 {
00161 CPortDescription port_desc;
00162
00163 PortInterface* port = mdf->getPort(ports[i]);
00164 port_desc.name = CORBA::string_dup(ports[i].c_str());
00165
00166 TypeInfo const* type_info = port->getTypeInfo();
00167 if (!type_info || !type_info->getProtocol(ORO_CORBA_PROTOCOL_ID))
00168 {
00169 log(Warning) << "the type of port " << ports[i] << " is not registered into the Orocos type system. It is ignored by the CORBA layer." << endlog();
00170 continue;
00171 }
00172
00173 port_desc.type_name = CORBA::string_dup(type_info->getTypeName().c_str());
00174 if (dynamic_cast<InputPortInterface*>(port))
00175 port_desc.type = corba::CInput;
00176 else
00177 port_desc.type = corba::COutput;
00178
00179 result[j++] = port_desc;
00180 }
00181 result->length( j );
00182 return result._retn();
00183 }
00184
00185 CPortType CDataFlowInterface_i::getPortType(const char * port_name) ACE_THROW_SPEC ((
00186 CORBA::SystemException
00187 ,::RTT::corba::CNoSuchPortException
00188 ))
00189 {
00190 PortInterface* p = mdf->getPort(port_name);
00191 if (p == 0)
00192 throw CNoSuchPortException();
00193
00194 if (dynamic_cast<InputPortInterface*>(p))
00195 return RTT::corba::CInput;
00196 else return RTT::corba::COutput;
00197 }
00198
00199 char* CDataFlowInterface_i::getDataType(const char * port_name) ACE_THROW_SPEC ((
00200 CORBA::SystemException
00201 ,::RTT::corba::CNoSuchPortException
00202 ))
00203 {
00204 PortInterface* p = mdf->getPort(port_name);
00205 if ( p == 0)
00206 throw CNoSuchPortException();
00207 return CORBA::string_dup( p->getTypeInfo()->getTypeName().c_str() );
00208 }
00209
00210 CORBA::Boolean CDataFlowInterface_i::isConnected(const char * port_name) ACE_THROW_SPEC ((
00211 CORBA::SystemException
00212 ,::RTT::corba::CNoSuchPortException
00213 ))
00214 {
00215 PortInterface* p = mdf->getPort(port_name);
00216 if (p == 0)
00217 throw corba::CNoSuchPortException();
00218
00219 return p->connected();
00220 }
00221
00222 void CDataFlowInterface_i::deregisterChannel(CChannelElement_ptr channel)
00223 { RTT::os::MutexLock lock(channel_list_mtx);
00224 ChannelList::iterator it=channel_list.begin();
00225 for (; it != channel_list.end(); ++it) {
00226 if (it->first->_is_equivalent (channel) ) {
00227 channel_list.erase(it);
00228 return;
00229 }
00230 }
00231 }
00232
00233 CORBA::Boolean CDataFlowInterface_i::channelReady(const char * reader_port_name, CChannelElement_ptr channel) ACE_THROW_SPEC ((
00234 CORBA::SystemException
00235 ,::RTT::corba::CNoSuchPortException
00236 ))
00237 {
00238 PortInterface* p = mdf->getPort(reader_port_name);
00239 if (p == 0)
00240 throw corba::CNoSuchPortException();
00241
00242 InputPortInterface* ip = dynamic_cast<InputPortInterface*>(p);
00243 if (ip == 0)
00244 throw corba::CNoSuchPortException();
00245
00246 CORBA_CHECK_THREAD();
00247
00248
00249 { RTT::os::MutexLock lock(channel_list_mtx);
00250 ChannelList::iterator it=channel_list.begin();
00251 for (; it != channel_list.end(); ++it) {
00252 if (it->first->_is_equivalent (channel) ) {
00253 try {
00254 return ip->channelReady( it->second );
00255 }
00256 catch(std::exception const& e)
00257 {
00258 log(Error) << "call to channelReady threw " << e.what() << endlog();
00259 throw;
00260 }
00261 }
00262 }
00263 }
00264 log(Error) << "Invalid CORBA channel given for port " << reader_port_name << ": could not match it to a local C++ channel." <<endlog();
00265 return false;
00266 }
00267
00268 void CDataFlowInterface_i::disconnectPort(const char * port_name) ACE_THROW_SPEC ((
00269 CORBA::SystemException
00270 ,::RTT::corba::CNoSuchPortException
00271 ))
00272 {
00273 PortInterface* p = mdf->getPort(port_name);
00274 if (p == 0) {
00275 log(Error) << "disconnectPort: No such port: "<< port_name <<endlog();
00276 throw corba::CNoSuchPortException();
00277 }
00278 CORBA_CHECK_THREAD();
00279 p->disconnect();
00280 }
00281
00282 bool CDataFlowInterface_i::removeConnection(
00283 const char* local_port,
00284 CDataFlowInterface_ptr remote_interface, const char* remote_port) ACE_THROW_SPEC ((
00285 CORBA::SystemException
00286 ,::RTT::corba::CNoSuchPortException
00287 ))
00288 {
00289 PortInterface* port = mdf->getPort(local_port);
00290
00291 if (port == 0) {
00292 log(Error) << "disconnectPort: No such port: "<< local_port <<endlog();
00293 throw corba::CNoSuchPortException();
00294 }
00295 if (dynamic_cast<OutputPortInterface*>(port) == 0) {
00296 log(Error) << "disconnectPort: "<< local_port << " is an input port" << endlog();
00297 throw corba::CNoSuchPortException();
00298 }
00299
00300 RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(remote_interface);
00301 if (local_interface)
00302 {
00303 PortInterface* other_port = local_interface->getPort(remote_port);
00304 if (!other_port)
00305 return false;
00306
00307
00308
00309
00310 if (port->disconnect(other_port))
00311 return true;
00312
00313 }
00314
00315 CORBA_CHECK_THREAD();
00316 RemoteConnID rcid(remote_interface, remote_port);
00317 return port->removeConnection( &rcid );
00318 }
00319
00320 ::CORBA::Boolean CDataFlowInterface_i::createStream( const char* port,
00321 RTT::corba::CConnPolicy & policy) ACE_THROW_SPEC ((
00322 CORBA::SystemException
00323 ,::RTT::corba::CNoSuchPortException
00324 ))
00325 {
00326 PortInterface* p = mdf->getPort(port);
00327 if (p == 0) {
00328 log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
00329 throw corba::CNoSuchPortException();
00330 }
00331
00332 CORBA_CHECK_THREAD();
00333 RTT::ConnPolicy p2 = toRTT(policy);
00334 if ( p->createStream( p2 ) ) {
00335 policy = toCORBA(p2);
00336 return true;
00337 }
00338 return false;
00339 }
00340
00341 void CDataFlowInterface_i::removeStream( const char* port, const char* stream_name) ACE_THROW_SPEC ((
00342 CORBA::SystemException
00343 ,::RTT::corba::CNoSuchPortException
00344 ))
00345 {
00346 PortInterface* p = mdf->getPort(port);
00347 if (p == 0) {
00348 log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
00349 throw corba::CNoSuchPortException();
00350 }
00351 CORBA_CHECK_THREAD();
00352 StreamConnID rcid(stream_name);
00353 p->removeConnection( &rcid );
00354 }
00355
00356
00357 CChannelElement_ptr CDataFlowInterface_i::buildChannelOutput(
00358 const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
00359 CORBA::SystemException
00360 ,::RTT::corba::CNoCorbaTransport
00361 ,::RTT::corba::CNoSuchPortException
00362 ))
00363 {
00364 Logger::In in("CDataFlowInterface_i::buildChannelOutput");
00365 InputPortInterface* port = dynamic_cast<InputPortInterface*>(mdf->getPort(port_name));
00366 if (port == 0)
00367 throw CNoSuchPortException();
00368
00369 TypeInfo const* type_info = port->getTypeInfo();
00370 if (!type_info)
00371 throw CNoCorbaTransport();
00372
00373 CorbaTypeTransporter* transporter =
00374 dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
00375 if (!transporter)
00376 throw CNoCorbaTransport();
00377
00378 CORBA_CHECK_THREAD();
00379
00380 ConnPolicy policy2 = toRTT(corba_policy);
00381
00382 ChannelElementBase::shared_ptr end = type_info->buildChannelOutput(*port);
00383 CRemoteChannelElement_i* this_element =
00384 transporter->createChannelElement_i(mdf, mpoa, corba_policy.pull);
00385 this_element->setCDataFlowInterface(this);
00386
00387
00388
00389
00390 if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
00391
00392
00393 if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
00394 log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
00395 log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
00396 return RTT::corba::CChannelElement::_nil();
00397 }
00398 RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, false);
00399
00400 if ( strlen( corba_policy.name_id.in()) == 0 )
00401 corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
00402
00403 if (ceb) {
00404
00405 dynamic_cast<ChannelElementBase*>(this_element)->setOutput(ceb);
00406 ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00407 ceb->setOutput( buf );
00408 buf->setOutput(end);
00409 log(Info) <<"Receiving data for port "<< policy2.name_id << " from out-of-band protocol "<< corba_policy.transport <<endlog();
00410 } else {
00411 log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
00412 return RTT::corba::CChannelElement::_nil();
00413 }
00414
00415 } else {
00416
00417 if ( !corba_policy.pull ) {
00418 ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00419 dynamic_cast<ChannelElementBase*>(this_element)->setOutput(buf);
00420 buf->setOutput(end);
00421 } else {
00422 dynamic_cast<ChannelElementBase*>(this_element)->setOutput(end);
00423 }
00424 }
00425
00426 this_element->_remove_ref();
00427
00428
00429 { RTT::os::MutexLock lock(channel_list_mtx);
00430 channel_list.push_back( ChannelList::value_type(this_element->_this(), end->getOutputEndPoint()));
00431 }
00432
00433 CRemoteChannelElement_var proxy = this_element->_this();
00434 return proxy._retn();
00435 }
00436
00440 CChannelElement_ptr CDataFlowInterface_i::buildChannelInput(
00441 const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
00442 CORBA::SystemException
00443 ,::RTT::corba::CNoCorbaTransport
00444 ,::RTT::corba::CNoSuchPortException
00445 ))
00446 {
00447 Logger::In in("CDataFlowInterface_i::buildChannelInput");
00448
00449 OutputPortInterface* port = dynamic_cast<OutputPortInterface*>(mdf->getPort(port_name));
00450 if (port == 0)
00451 throw CNoSuchPortException();
00452
00453 TypeInfo const* type_info = port->getTypeInfo();
00454 if (!type_info)
00455 throw CNoCorbaTransport();
00456
00457 CorbaTypeTransporter* transporter =
00458 dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
00459 if (!transporter)
00460 throw CNoCorbaTransport();
00461
00462 CORBA_CHECK_THREAD();
00463
00464 ConnPolicy policy2 = toRTT(corba_policy);
00465
00466
00467 ChannelElementBase::shared_ptr start = type_info->buildChannelInput(*port);
00468
00469
00470 CRemoteChannelElement_i* this_element;
00471 PortableServer::ServantBase_var servant = this_element = transporter->createChannelElement_i(mdf, mpoa, corba_policy.pull);
00472 this_element->setCDataFlowInterface(this);
00473
00474
00475 assert( dynamic_cast<ChannelElementBase*>(this_element) );
00476 start->getOutputEndPoint()->setOutput( dynamic_cast<ChannelElementBase*>(this_element));
00477
00478
00479
00480
00481 if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
00482
00483
00484 if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
00485 log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
00486 log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
00487 throw CNoCorbaTransport();
00488 }
00489 RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, true);
00490
00491 if ( strlen( corba_policy.name_id.in()) == 0 )
00492 corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
00493
00494 if (ceb) {
00495
00496 start->getOutputEndPoint()->setOutput( ceb );
00497 log(Info) <<"Sending data from port "<< policy2.name_id << " to out-of-band protocol "<< corba_policy.transport <<endlog();
00498 } else {
00499 log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
00500 throw CNoCorbaTransport();
00501 }
00502 } else {
00503
00504 ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00505 start->setOutput(buf);
00506 buf->setOutput( dynamic_cast<ChannelElementBase*>(this_element) );
00507 }
00508
00509
00510
00511 port->addConnection( new SimpleConnID(), start->getInputEndPoint(), policy2);
00512
00513
00514
00515
00516
00517 { RTT::os::MutexLock lock(channel_list_mtx);
00518 channel_list.push_back( ChannelList::value_type(this_element->_this(), start->getInputEndPoint()));
00519 }
00520
00521 return this_element->_this();
00522 }
00523
00524
00525 ::CORBA::Boolean CDataFlowInterface_i::createConnection(
00526 const char* writer_port, CDataFlowInterface_ptr reader_interface,
00527 const char* reader_port, CConnPolicy & policy) ACE_THROW_SPEC ((
00528 CORBA::SystemException
00529 ,::RTT::corba::CNoSuchPortException
00530 ))
00531 {
00532 Logger::In in("CDataFlowInterface_i::createConnection");
00533 OutputPortInterface* writer = dynamic_cast<OutputPortInterface*>(mdf->getPort(writer_port));
00534 if (writer == 0)
00535 throw CNoSuchPortException();
00536
00537 CORBA_CHECK_THREAD();
00538
00539
00540 RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(reader_interface);
00541 if (local_interface && policy.transport == 0)
00542 {
00543 InputPortInterface* reader =
00544 dynamic_cast<InputPortInterface*>(local_interface->getPort(reader_port));
00545 if (!reader)
00546 {
00547 log(Warning) << "CORBA: createConnection() target is not an input port" << endlog();
00548 throw CNoSuchPortException();
00549 return false;
00550 }
00551
00552 log(Debug) << "CORBA: createConnection() is creating a LOCAL connection between " <<
00553 writer_port << " and " << reader_port << endlog();
00554 return writer->createConnection(*reader, toRTT(policy));
00555 }
00556 else
00557 log(Debug) << "CORBA: createConnection() is creating a REMOTE connection between " <<
00558 writer_port << " and " << reader_port << endlog();
00559
00560 try {
00561 if (reader_interface->getPortType(reader_port) != corba::CInput) {
00562 log(Error) << "Could not create connection: " << reader_port <<" is not an input port."<<endlog();
00563 throw CNoSuchPortException();
00564 return false;
00565 }
00566
00567
00568 RemoteInputPort port(writer->getTypeInfo(), reader_interface, reader_port, mpoa);
00569 port.setInterface( mdf );
00570
00571 return writer->createConnection(port, toRTT(policy));
00572 }
00573 catch(CORBA::COMM_FAILURE&) { throw; }
00574 catch(CORBA::TRANSIENT&) { throw; }
00575 catch(...) { throw; }
00576 return false;
00577 }
00578
00579
00580 CRemoteChannelElement_i::CRemoteChannelElement_i(RTT::corba::CorbaTypeTransporter const& transport,
00581 PortableServer::POA_ptr poa)
00582 : transport(transport)
00583 , mpoa(PortableServer::POA::_duplicate(poa))
00584 , mdataflow(0)
00585 { }
00586 CRemoteChannelElement_i::~CRemoteChannelElement_i() {}
00587 PortableServer::POA_ptr CRemoteChannelElement_i::_default_POA()
00588 { return PortableServer::POA::_duplicate(mpoa); }
00589 void CRemoteChannelElement_i::setRemoteSide(CRemoteChannelElement_ptr remote) ACE_THROW_SPEC ((
00590 CORBA::SystemException
00591 ))
00592 { this->remote_side = RTT::corba::CRemoteChannelElement::_duplicate(remote); }
00593