00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include "DataFlowI.h"
00039 #include "corba.h"
00040 #ifdef CORBA_IS_TAO
00041 #include "DataFlowS.h"
00042 #else
00043 #include "DataFlowC.h"
00044 #endif
00045 #include "../../base/PortInterface.hpp"
00046 #include "../../Logger.hpp"
00047 #include "TaskContextProxy.hpp"
00048 #include "CorbaTypeTransporter.hpp"
00049 #include "../../InputPort.hpp"
00050 #include "../../OutputPort.hpp"
00051 #include "CorbaConnPolicy.hpp"
00052 #include "CorbaLib.hpp"
00053
00054 #include "RemotePorts.hpp"
00055 #include "RemoteConnID.hpp"
00056 #include <rtt/os/MutexLock.hpp>
00057
00058 #include <iostream>
00059
00060 using namespace std;
00061 using namespace RTT::corba;
00062 using namespace RTT::base;
00063 using namespace RTT::types;
00064 using namespace RTT::internal;
00065
00066 CDataFlowInterface_i::ServantMap CDataFlowInterface_i::s_servant_map;
00067
00068 CDataFlowInterface_i::CDataFlowInterface_i (RTT::DataFlowInterface* interface, PortableServer::POA_ptr poa)
00069 : mdf(interface), mpoa(PortableServer::POA::_duplicate(poa))
00070 {
00071 }
00072
00073 CDataFlowInterface_i::~CDataFlowInterface_i ()
00074 {
00075 channel_list.clear();
00076 }
00077
00078 RTT::DataFlowInterface* CDataFlowInterface_i::getDataFlowInterface() const
00079 {
00080 return mdf;
00081 }
00082
00083 void CDataFlowInterface_i::registerServant(CDataFlowInterface_ptr objref, CDataFlowInterface_i* servant)
00084 {
00085 s_servant_map.push_back(ServantInfo(objref, servant));
00086 }
00087 void CDataFlowInterface_i::deregisterServant(RTT::DataFlowInterface* obj)
00088 {
00089 for (ServantMap::iterator it = s_servant_map.begin();
00090 it != s_servant_map.end(); ++it)
00091 {
00092 if (it->getDataFlowInterface() == obj)
00093 {
00094 log(Debug) << "deregistered servant for data flow interface" << endlog();
00095 CDataFlowInterface_i* servant = it->servant;
00096 PortableServer::ObjectId_var oid = servant->mpoa->servant_to_id(it->servant);
00097 servant->mpoa->deactivate_object(oid);
00098 s_servant_map.erase(it);
00099 return;
00100 }
00101 }
00102 }
00103
00104 void CDataFlowInterface_i::clearServants()
00105 {
00106 while (!s_servant_map.empty())
00107 {
00108 ServantMap::iterator it = s_servant_map.begin();
00109 deregisterServant(it->getDataFlowInterface());
00110 }
00111 }
00112
00113 RTT::DataFlowInterface* CDataFlowInterface_i::getLocalInterface(CDataFlowInterface_ptr objref)
00114 {
00115 for (ServantMap::const_iterator it = s_servant_map.begin();
00116 it != s_servant_map.end(); ++it)
00117 {
00118 if (it->objref->_is_equivalent(objref))
00119 return it->getDataFlowInterface();
00120 }
00121 return NULL;
00122 }
00123
00124 CDataFlowInterface_ptr CDataFlowInterface_i::getRemoteInterface(RTT::DataFlowInterface* dfi, PortableServer::POA_ptr poa)
00125 {
00126 for (ServantMap::const_iterator it = s_servant_map.begin();
00127 it != s_servant_map.end(); ++it)
00128 {
00129 if (it->getDataFlowInterface() == dfi)
00130 return it->objref;
00131 }
00132 CDataFlowInterface_i* servant = new CDataFlowInterface_i(dfi, poa );
00133 CDataFlowInterface_ptr server = servant->_this();
00134 servant->_remove_ref();
00135 registerServant( server, servant);
00136 return server;
00137 }
00138
00139 PortableServer::POA_ptr CDataFlowInterface_i::_default_POA()
00140 {
00141 return PortableServer::POA::_duplicate(mpoa);
00142 }
00143
00144 CDataFlowInterface::CPortNames * CDataFlowInterface_i::getPorts() ACE_THROW_SPEC ((
00145 CORBA::SystemException
00146 ))
00147 {
00148 ::RTT::DataFlowInterface::PortNames ports = mdf->getPortNames();
00149
00150 RTT::corba::CDataFlowInterface::CPortNames_var pn = new RTT::corba::CDataFlowInterface::CPortNames();
00151 pn->length( ports.size() );
00152
00153 for (unsigned int i=0; i != ports.size(); ++i )
00154 pn[i] = CORBA::string_dup( ports[i].c_str() );
00155
00156 return pn._retn();
00157 }
00158
00159 CDataFlowInterface::CPortDescriptions* CDataFlowInterface_i::getPortDescriptions() ACE_THROW_SPEC ((
00160 CORBA::SystemException
00161 ))
00162 {
00163 RTT::DataFlowInterface::PortNames ports = mdf->getPortNames();
00164 RTT::corba::CDataFlowInterface::CPortDescriptions_var result = new RTT::corba::CDataFlowInterface::CPortDescriptions();
00165 result->length( ports.size() );
00166
00167 unsigned int j = 0;
00168 for (unsigned int i = 0; i < ports.size(); ++i)
00169 {
00170 CPortDescription port_desc;
00171
00172 PortInterface* port = mdf->getPort(ports[i]);
00173 port_desc.name = CORBA::string_dup(ports[i].c_str());
00174
00175 TypeInfo const* type_info = port->getTypeInfo();
00176 if (!type_info || !type_info->getProtocol(ORO_CORBA_PROTOCOL_ID))
00177 {
00178 log(Warning) << "the type of port " << ports[i] << " is not registered into the Orocos type system. It is ignored by the CORBA layer." << endlog();
00179 continue;
00180 }
00181
00182 port_desc.type_name = CORBA::string_dup(type_info->getTypeName().c_str());
00183 if (dynamic_cast<InputPortInterface*>(port))
00184 port_desc.type = corba::CInput;
00185 else
00186 port_desc.type = corba::COutput;
00187
00188 result[j++] = port_desc;
00189 }
00190 result->length( j );
00191 return result._retn();
00192 }
00193
00194 CPortType CDataFlowInterface_i::getPortType(const char * port_name) ACE_THROW_SPEC ((
00195 CORBA::SystemException
00196 ,::RTT::corba::CNoSuchPortException
00197 ))
00198 {
00199 PortInterface* p = mdf->getPort(port_name);
00200 if (p == 0)
00201 throw CNoSuchPortException();
00202
00203 if (dynamic_cast<InputPortInterface*>(p))
00204 return RTT::corba::CInput;
00205 else return RTT::corba::COutput;
00206 }
00207
00208 char* CDataFlowInterface_i::getDataType(const char * port_name) ACE_THROW_SPEC ((
00209 CORBA::SystemException
00210 ,::RTT::corba::CNoSuchPortException
00211 ))
00212 {
00213 PortInterface* p = mdf->getPort(port_name);
00214 if ( p == 0)
00215 throw CNoSuchPortException();
00216 return CORBA::string_dup( p->getTypeInfo()->getTypeName().c_str() );
00217 }
00218
00219 CORBA::Boolean CDataFlowInterface_i::isConnected(const char * port_name) ACE_THROW_SPEC ((
00220 CORBA::SystemException
00221 ,::RTT::corba::CNoSuchPortException
00222 ))
00223 {
00224 PortInterface* p = mdf->getPort(port_name);
00225 if (p == 0)
00226 throw corba::CNoSuchPortException();
00227
00228 return p->connected();
00229 }
00230
00231 void CDataFlowInterface_i::deregisterChannel(CChannelElement_ptr channel)
00232 { RTT::os::MutexLock lock(channel_list_mtx);
00233 ChannelList::iterator it=channel_list.begin();
00234 for (; it != channel_list.end(); ++it) {
00235 if (it->first->_is_equivalent (channel) ) {
00236 channel_list.erase(it);
00237 return;
00238 }
00239 }
00240 }
00241
00242 CORBA::Boolean CDataFlowInterface_i::channelReady(const char * reader_port_name, CChannelElement_ptr channel, CConnPolicy const& policy ) ACE_THROW_SPEC ((
00243 CORBA::SystemException
00244 ,::RTT::corba::CNoSuchPortException
00245 ))
00246 {
00247 PortInterface* p = mdf->getPort(reader_port_name);
00248 if (p == 0)
00249 throw corba::CNoSuchPortException();
00250
00251 InputPortInterface* ip = dynamic_cast<InputPortInterface*>(p);
00252 if (ip == 0)
00253 throw corba::CNoSuchPortException();
00254
00255 CORBA_CHECK_THREAD();
00256
00257
00258 { RTT::os::MutexLock lock(channel_list_mtx);
00259 ChannelList::iterator it=channel_list.begin();
00260 for (; it != channel_list.end(); ++it) {
00261 if (it->first->_is_equivalent (channel) ) {
00262 try {
00263 ConnPolicy cp;
00264 cp=toRTT(policy);
00265 return ip->channelReady( it->second, cp );
00266 }
00267 catch(std::exception const& e)
00268 {
00269 log(Error) << "call to channelReady threw " << e.what() << endlog();
00270 throw;
00271 }
00272 }
00273 }
00274 }
00275 log(Error) << "Invalid CORBA channel given for port " << reader_port_name << ": could not match it to a local C++ channel." <<endlog();
00276 return false;
00277 }
00278
00279 void CDataFlowInterface_i::disconnectPort(const char * port_name) ACE_THROW_SPEC ((
00280 CORBA::SystemException
00281 ,::RTT::corba::CNoSuchPortException
00282 ))
00283 {
00284 PortInterface* p = mdf->getPort(port_name);
00285 if (p == 0) {
00286 log(Error) << "disconnectPort: No such port: "<< port_name <<endlog();
00287 throw corba::CNoSuchPortException();
00288 }
00289 CORBA_CHECK_THREAD();
00290 p->disconnect();
00291 }
00292
00293 bool CDataFlowInterface_i::removeConnection(
00294 const char* local_port,
00295 CDataFlowInterface_ptr remote_interface, const char* remote_port) ACE_THROW_SPEC ((
00296 CORBA::SystemException
00297 ,::RTT::corba::CNoSuchPortException
00298 ))
00299 {
00300 PortInterface* port = mdf->getPort(local_port);
00301
00302 if (port == 0) {
00303 log(Error) << "disconnectPort: No such port: "<< local_port <<endlog();
00304 throw corba::CNoSuchPortException();
00305 }
00306 if (dynamic_cast<OutputPortInterface*>(port) == 0) {
00307 log(Error) << "disconnectPort: "<< local_port << " is an input port" << endlog();
00308 throw corba::CNoSuchPortException();
00309 }
00310
00311 RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(remote_interface);
00312 if (local_interface)
00313 {
00314 PortInterface* other_port = local_interface->getPort(remote_port);
00315 if (!other_port)
00316 return false;
00317
00318
00319
00320
00321 if (port->disconnect(other_port))
00322 return true;
00323
00324 }
00325
00326 CORBA_CHECK_THREAD();
00327 RemoteConnID rcid(remote_interface, remote_port);
00328 return port->removeConnection( &rcid );
00329 }
00330
00331 ::CORBA::Boolean CDataFlowInterface_i::createStream( const char* port,
00332 RTT::corba::CConnPolicy & policy) ACE_THROW_SPEC ((
00333 CORBA::SystemException
00334 ,::RTT::corba::CNoSuchPortException
00335 ))
00336 {
00337 PortInterface* p = mdf->getPort(port);
00338 if (p == 0) {
00339 log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
00340 throw corba::CNoSuchPortException();
00341 }
00342
00343 CORBA_CHECK_THREAD();
00344 RTT::ConnPolicy p2 = toRTT(policy);
00345 if ( p->createStream( p2 ) ) {
00346 policy = toCORBA(p2);
00347 return true;
00348 }
00349 return false;
00350 }
00351
00352 void CDataFlowInterface_i::removeStream( const char* port, const char* stream_name) ACE_THROW_SPEC ((
00353 CORBA::SystemException
00354 ,::RTT::corba::CNoSuchPortException
00355 ))
00356 {
00357 PortInterface* p = mdf->getPort(port);
00358 if (p == 0) {
00359 log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
00360 throw corba::CNoSuchPortException();
00361 }
00362 CORBA_CHECK_THREAD();
00363 StreamConnID rcid(stream_name);
00364 p->removeConnection( &rcid );
00365 }
00366
00367
00368 CChannelElement_ptr CDataFlowInterface_i::buildChannelOutput(
00369 const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
00370 CORBA::SystemException
00371 ,::RTT::corba::CNoCorbaTransport
00372 ,::RTT::corba::CNoSuchPortException
00373 ))
00374 {
00375 Logger::In in("CDataFlowInterface_i::buildChannelOutput");
00376 InputPortInterface* port = dynamic_cast<InputPortInterface*>(mdf->getPort(port_name));
00377 if (port == 0)
00378 throw CNoSuchPortException();
00379
00380 TypeInfo const* type_info = port->getTypeInfo();
00381 if (!type_info)
00382 throw CNoCorbaTransport();
00383
00384 CorbaTypeTransporter* transporter =
00385 dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
00386 if (!transporter)
00387 throw CNoCorbaTransport();
00388
00389 CORBA_CHECK_THREAD();
00390
00391 ConnPolicy policy2 = toRTT(corba_policy);
00392
00393 ChannelElementBase::shared_ptr end = type_info->buildChannelOutput(*port);
00394 CRemoteChannelElement_i* this_element =
00395 transporter->createChannelElement_i(mdf, mpoa, corba_policy.pull);
00396 this_element->setCDataFlowInterface(this);
00397
00398
00399
00400
00401 if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
00402
00403
00404 if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
00405 log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
00406 log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
00407 return RTT::corba::CChannelElement::_nil();
00408 }
00409 RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, false);
00410
00411 if ( strlen( corba_policy.name_id.in()) == 0 )
00412 corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
00413
00414 if (ceb) {
00415
00416 dynamic_cast<ChannelElementBase*>(this_element)->setOutput(ceb);
00417 ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00418 ceb->setOutput( buf );
00419 buf->setOutput(end);
00420 log(Info) <<"Receiving data for port "<< policy2.name_id << " from out-of-band protocol "<< corba_policy.transport <<endlog();
00421 } else {
00422 log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
00423 return RTT::corba::CChannelElement::_nil();
00424 }
00425
00426 } else {
00427
00428 if ( !corba_policy.pull ) {
00429 ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00430 dynamic_cast<ChannelElementBase*>(this_element)->setOutput(buf);
00431 buf->setOutput(end);
00432 } else {
00433 dynamic_cast<ChannelElementBase*>(this_element)->setOutput(end);
00434 }
00435 }
00436
00437 this_element->_remove_ref();
00438
00439
00440 { RTT::os::MutexLock lock(channel_list_mtx);
00441 channel_list.push_back( ChannelList::value_type(this_element->_this(), end->getOutputEndPoint()));
00442 }
00443
00444 CRemoteChannelElement_var proxy = this_element->_this();
00445 return proxy._retn();
00446 }
00447
00451 CChannelElement_ptr CDataFlowInterface_i::buildChannelInput(
00452 const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
00453 CORBA::SystemException
00454 ,::RTT::corba::CNoCorbaTransport
00455 ,::RTT::corba::CNoSuchPortException
00456 ))
00457 {
00458 Logger::In in("CDataFlowInterface_i::buildChannelInput");
00459
00460 OutputPortInterface* port = dynamic_cast<OutputPortInterface*>(mdf->getPort(port_name));
00461 if (port == 0)
00462 throw CNoSuchPortException();
00463
00464 TypeInfo const* type_info = port->getTypeInfo();
00465 if (!type_info)
00466 throw CNoCorbaTransport();
00467
00468 CorbaTypeTransporter* transporter =
00469 dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
00470 if (!transporter)
00471 throw CNoCorbaTransport();
00472
00473 CORBA_CHECK_THREAD();
00474
00475 ConnPolicy policy2 = toRTT(corba_policy);
00476
00477
00478 ChannelElementBase::shared_ptr start = type_info->buildChannelInput(*port);
00479
00480
00481 CRemoteChannelElement_i* this_element;
00482 PortableServer::ServantBase_var servant = this_element = transporter->createChannelElement_i(mdf, mpoa, corba_policy.pull);
00483 this_element->setCDataFlowInterface(this);
00484
00485
00486 assert( dynamic_cast<ChannelElementBase*>(this_element) );
00487 start->getOutputEndPoint()->setOutput( dynamic_cast<ChannelElementBase*>(this_element));
00488
00489
00490
00491
00492 if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
00493
00494
00495 if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
00496 log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
00497 log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
00498 throw CNoCorbaTransport();
00499 }
00500 RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, true);
00501
00502 if ( strlen( corba_policy.name_id.in()) == 0 )
00503 corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
00504
00505 if (ceb) {
00506
00507 start->getOutputEndPoint()->setOutput( ceb );
00508 log(Info) <<"Sending data from port "<< policy2.name_id << " to out-of-band protocol "<< corba_policy.transport <<endlog();
00509 } else {
00510 log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
00511 throw CNoCorbaTransport();
00512 }
00513 } else {
00514
00515 ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00516 start->setOutput(buf);
00517 buf->setOutput( dynamic_cast<ChannelElementBase*>(this_element) );
00518 }
00519
00520
00521
00522 port->addConnection( new SimpleConnID(), start->getInputEndPoint(), policy2);
00523
00524
00525
00526
00527
00528 { RTT::os::MutexLock lock(channel_list_mtx);
00529 channel_list.push_back( ChannelList::value_type(this_element->_this(), start->getInputEndPoint()));
00530 }
00531
00532 return this_element->_this();
00533 }
00534
00535
00536 ::CORBA::Boolean CDataFlowInterface_i::createConnection(
00537 const char* writer_port, CDataFlowInterface_ptr reader_interface,
00538 const char* reader_port, CConnPolicy & policy) ACE_THROW_SPEC ((
00539 CORBA::SystemException
00540 ,::RTT::corba::CNoSuchPortException
00541 ))
00542 {
00543 Logger::In in("CDataFlowInterface_i::createConnection");
00544 OutputPortInterface* writer = dynamic_cast<OutputPortInterface*>(mdf->getPort(writer_port));
00545 if (writer == 0)
00546 throw CNoSuchPortException();
00547
00548 CORBA_CHECK_THREAD();
00549
00550
00551 RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(reader_interface);
00552 if (local_interface && policy.transport == 0)
00553 {
00554 InputPortInterface* reader =
00555 dynamic_cast<InputPortInterface*>(local_interface->getPort(reader_port));
00556 if (!reader)
00557 {
00558 log(Warning) << "CORBA: createConnection() target is not an input port" << endlog();
00559 throw CNoSuchPortException();
00560 return false;
00561 }
00562
00563 log(Debug) << "CORBA: createConnection() is creating a LOCAL connection between " <<
00564 writer_port << " and " << reader_port << endlog();
00565 return writer->createConnection(*reader, toRTT(policy));
00566 }
00567 else
00568 log(Debug) << "CORBA: createConnection() is creating a REMOTE connection between " <<
00569 writer_port << " and " << reader_port << endlog();
00570
00571 try {
00572 if (reader_interface->getPortType(reader_port) != corba::CInput) {
00573 log(Error) << "Could not create connection: " << reader_port <<" is not an input port."<<endlog();
00574 throw CNoSuchPortException();
00575 return false;
00576 }
00577
00578
00579 RemoteInputPort port(writer->getTypeInfo(), reader_interface, reader_port, mpoa);
00580 port.setInterface( mdf );
00581
00582 return writer->createConnection(port, toRTT(policy));
00583 }
00584 catch(CORBA::COMM_FAILURE&) { throw; }
00585 catch(CORBA::TRANSIENT&) { throw; }
00586 catch(...) { throw; }
00587 return false;
00588 }
00589
00590
00591 CRemoteChannelElement_i::CRemoteChannelElement_i(RTT::corba::CorbaTypeTransporter const& transport,
00592 PortableServer::POA_ptr poa)
00593 : transport(transport)
00594 , mpoa(PortableServer::POA::_duplicate(poa))
00595 , mdataflow(0)
00596 { }
00597 CRemoteChannelElement_i::~CRemoteChannelElement_i() {}
00598 PortableServer::POA_ptr CRemoteChannelElement_i::_default_POA()
00599 { return PortableServer::POA::_duplicate(mpoa); }
00600 void CRemoteChannelElement_i::setRemoteSide(CRemoteChannelElement_ptr remote) ACE_THROW_SPEC ((
00601 CORBA::SystemException
00602 ))
00603 { this->remote_side = RTT::corba::CRemoteChannelElement::_duplicate(remote); }
00604