DataFlowI.cpp
Go to the documentation of this file.
00001 /***************************************************************************
00002   tag: FMTC  Tue Mar 11 21:49:24 CET 2008  DataFlowI.cpp
00003 
00004                         DataFlowI.cpp -  description
00005                            -------------------
00006     begin                : Tue March 11 2008
00007     copyright            : (C) 2008 FMTC
00008     email                : peter.soetens@fmtc.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 #include "DataFlowI.h"
00039 #include "corba.h"
00040 #ifdef CORBA_IS_TAO
00041 #include "DataFlowS.h"
00042 #else
00043 #include "DataFlowC.h"
00044 #endif
00045 #include "../../base/PortInterface.hpp"
00046 #include "../../Logger.hpp"
00047 #include "TaskContextProxy.hpp"
00048 #include "CorbaTypeTransporter.hpp"
00049 #include "../../InputPort.hpp"
00050 #include "../../OutputPort.hpp"
00051 #include "CorbaConnPolicy.hpp"
00052 #include "CorbaLib.hpp"
00053 
00054 #include "RemotePorts.hpp"
00055 #include "RemoteConnID.hpp"
00056 #include <rtt/os/MutexLock.hpp>
00057 
00058 #include <iostream>
00059 
00060 using namespace std;
00061 using namespace RTT::corba;
00062 using namespace RTT::base;
00063 using namespace RTT::types;
00064 using namespace RTT::internal;
00065 
00066 CDataFlowInterface_i::ServantMap CDataFlowInterface_i::s_servant_map;
00067 
00068 CDataFlowInterface_i::CDataFlowInterface_i (RTT::DataFlowInterface* interface, PortableServer::POA_ptr poa)
00069     : mdf(interface), mpoa(PortableServer::POA::_duplicate(poa))
00070 {
00071 }
00072 
00073 CDataFlowInterface_i::~CDataFlowInterface_i ()
00074 {
00075         channel_list.clear();
00076 }
00077 
00078 RTT::DataFlowInterface* CDataFlowInterface_i::getDataFlowInterface() const
00079 {
00080     return mdf;
00081 }
00082 
00083 void CDataFlowInterface_i::registerServant(CDataFlowInterface_ptr objref, CDataFlowInterface_i* servant)
00084 {
00085     s_servant_map.push_back(ServantInfo(objref, servant));
00086 }
00087 void CDataFlowInterface_i::deregisterServant(RTT::DataFlowInterface* obj)
00088 {
00089     for (ServantMap::iterator it = s_servant_map.begin();
00090             it != s_servant_map.end(); ++it)
00091     {
00092         if (it->getDataFlowInterface() == obj)
00093         {
00094             log(Debug) << "deregistered servant for data flow interface" << endlog();
00095             CDataFlowInterface_i* servant = it->servant;
00096             PortableServer::ObjectId_var oid = servant->mpoa->servant_to_id(it->servant);
00097             servant->mpoa->deactivate_object(oid);
00098             s_servant_map.erase(it);
00099             return;
00100         }
00101     }
00102 }
00103 
00104 void CDataFlowInterface_i::clearServants()
00105 {
00106     while (!s_servant_map.empty())
00107     {
00108         ServantMap::iterator it = s_servant_map.begin();
00109         deregisterServant(it->getDataFlowInterface());
00110     }
00111 }
00112 
00113 RTT::DataFlowInterface* CDataFlowInterface_i::getLocalInterface(CDataFlowInterface_ptr objref)
00114 {
00115     for (ServantMap::const_iterator it = s_servant_map.begin();
00116             it != s_servant_map.end(); ++it)
00117     {
00118         if (it->objref->_is_equivalent(objref))
00119             return it->getDataFlowInterface();
00120     }
00121     return NULL;
00122 }
00123 
00124 CDataFlowInterface_ptr CDataFlowInterface_i::getRemoteInterface(RTT::DataFlowInterface* dfi, PortableServer::POA_ptr poa)
00125 {
00126     for (ServantMap::const_iterator it = s_servant_map.begin();
00127             it != s_servant_map.end(); ++it)
00128     {
00129         if (it->getDataFlowInterface() == dfi)
00130             return it->objref;
00131     }
00132     CDataFlowInterface_i* servant = new CDataFlowInterface_i(dfi, poa );
00133     CDataFlowInterface_ptr server = servant->_this();
00134     servant->_remove_ref();
00135     registerServant( server, servant);
00136     return server;
00137 }
00138 
00139 PortableServer::POA_ptr CDataFlowInterface_i::_default_POA()
00140 {
00141     return PortableServer::POA::_duplicate(mpoa);
00142 }
00143 
00144 CDataFlowInterface::CPortNames * CDataFlowInterface_i::getPorts() ACE_THROW_SPEC ((
00145               CORBA::SystemException
00146             ))
00147 {
00148     ::RTT::DataFlowInterface::PortNames ports = mdf->getPortNames();
00149 
00150     RTT::corba::CDataFlowInterface::CPortNames_var pn = new RTT::corba::CDataFlowInterface::CPortNames();
00151     pn->length( ports.size() );
00152 
00153     for (unsigned int i=0; i != ports.size(); ++i )
00154         pn[i] = CORBA::string_dup( ports[i].c_str() );
00155 
00156     return pn._retn();
00157 }
00158 
00159 CDataFlowInterface::CPortDescriptions* CDataFlowInterface_i::getPortDescriptions() ACE_THROW_SPEC ((
00160               CORBA::SystemException
00161             ))
00162 {
00163     RTT::DataFlowInterface::PortNames ports = mdf->getPortNames();
00164     RTT::corba::CDataFlowInterface::CPortDescriptions_var result = new RTT::corba::CDataFlowInterface::CPortDescriptions();
00165     result->length( ports.size() );
00166 
00167     unsigned int j = 0;
00168     for (unsigned int i = 0; i < ports.size(); ++i)
00169     {
00170         CPortDescription port_desc;
00171 
00172         PortInterface* port = mdf->getPort(ports[i]);
00173         port_desc.name = CORBA::string_dup(ports[i].c_str());
00174 
00175         TypeInfo const* type_info = port->getTypeInfo();
00176         if (!type_info || !type_info->getProtocol(ORO_CORBA_PROTOCOL_ID))
00177         {
00178             log(Warning) << "the type of port " << ports[i] << " is not registered into the Orocos type system. It is ignored by the CORBA layer." << endlog();
00179             continue;
00180         }
00181 
00182         port_desc.type_name = CORBA::string_dup(type_info->getTypeName().c_str());
00183         if (dynamic_cast<InputPortInterface*>(port))
00184             port_desc.type = corba::CInput;
00185         else
00186             port_desc.type = corba::COutput;
00187 
00188         result[j++] = port_desc;
00189     }
00190     result->length( j );
00191     return result._retn();
00192 }
00193 
00194 CPortType CDataFlowInterface_i::getPortType(const char * port_name) ACE_THROW_SPEC ((
00195               CORBA::SystemException
00196               ,::RTT::corba::CNoSuchPortException
00197             ))
00198 {
00199     PortInterface* p = mdf->getPort(port_name);
00200     if (p == 0)
00201         throw CNoSuchPortException();
00202 
00203     if (dynamic_cast<InputPortInterface*>(p))
00204         return RTT::corba::CInput;
00205     else return RTT::corba::COutput;
00206 }
00207 
00208 char* CDataFlowInterface_i::getDataType(const char * port_name) ACE_THROW_SPEC ((
00209               CORBA::SystemException
00210               ,::RTT::corba::CNoSuchPortException
00211             ))
00212 {
00213     PortInterface* p = mdf->getPort(port_name);
00214     if ( p == 0)
00215         throw CNoSuchPortException();
00216     return CORBA::string_dup( p->getTypeInfo()->getTypeName().c_str() );
00217 }
00218 
00219 CORBA::Boolean CDataFlowInterface_i::isConnected(const char * port_name) ACE_THROW_SPEC ((
00220               CORBA::SystemException
00221               ,::RTT::corba::CNoSuchPortException
00222             ))
00223 {
00224     PortInterface* p = mdf->getPort(port_name);
00225     if (p == 0)
00226         throw corba::CNoSuchPortException();
00227 
00228     return p->connected();
00229 }
00230 
00231 void CDataFlowInterface_i::deregisterChannel(CChannelElement_ptr channel)
00232 { RTT::os::MutexLock lock(channel_list_mtx);
00233     ChannelList::iterator it=channel_list.begin();
00234     for (; it != channel_list.end(); ++it) {
00235         if (it->first->_is_equivalent (channel) ) {
00236             channel_list.erase(it); // it->first is a _var so releases automatically.
00237             return;
00238         }
00239     }
00240 }
00241 
00242 CORBA::Boolean CDataFlowInterface_i::channelReady(const char * reader_port_name, CChannelElement_ptr channel) ACE_THROW_SPEC ((
00243               CORBA::SystemException
00244               ,::RTT::corba::CNoSuchPortException
00245             ))
00246 {
00247     PortInterface* p = mdf->getPort(reader_port_name);
00248     if (p == 0)
00249         throw corba::CNoSuchPortException();
00250 
00251     InputPortInterface* ip = dynamic_cast<InputPortInterface*>(p);
00252     if (ip == 0)
00253         throw corba::CNoSuchPortException();
00254 
00255     CORBA_CHECK_THREAD();
00256     // lookup the C++ channel that matches the corba channel and
00257     // inform our local port that that C++ channel is ready.
00258     { RTT::os::MutexLock lock(channel_list_mtx);
00259         ChannelList::iterator it=channel_list.begin();
00260         for (; it != channel_list.end(); ++it) {
00261             if (it->first->_is_equivalent (channel) ) {
00262                 try {
00263                     return ip->channelReady( it->second );
00264                 }
00265                 catch(std::exception const& e)
00266                 {
00267                     log(Error) << "call to channelReady threw " << e.what() << endlog();
00268                     throw;
00269                 }
00270             }
00271         }
00272     }
00273     log(Error) << "Invalid CORBA channel given for port " << reader_port_name << ": could not match it to a local C++ channel." <<endlog();
00274     return false;
00275 }
00276 
00277 void CDataFlowInterface_i::disconnectPort(const char * port_name) ACE_THROW_SPEC ((
00278               CORBA::SystemException
00279               ,::RTT::corba::CNoSuchPortException
00280             ))
00281 {
00282     PortInterface* p = mdf->getPort(port_name);
00283     if (p == 0) {
00284         log(Error) << "disconnectPort: No such port: "<< port_name <<endlog();
00285         throw corba::CNoSuchPortException();
00286     }
00287     CORBA_CHECK_THREAD();
00288     p->disconnect();
00289 }
00290 
00291 bool CDataFlowInterface_i::removeConnection(
00292         const char* local_port,
00293         CDataFlowInterface_ptr remote_interface, const char* remote_port) ACE_THROW_SPEC ((
00294                       CORBA::SystemException
00295                       ,::RTT::corba::CNoSuchPortException
00296                     ))
00297 {
00298     PortInterface* port = mdf->getPort(local_port);
00299     // CORBA does not support disconnecting from the input port
00300     if (port == 0) {
00301         log(Error) << "disconnectPort: No such port: "<< local_port <<endlog();
00302         throw corba::CNoSuchPortException();
00303     }
00304     if (dynamic_cast<OutputPortInterface*>(port) == 0) {
00305         log(Error) << "disconnectPort: "<< local_port << " is an input port" << endlog();
00306         throw corba::CNoSuchPortException();
00307     }
00308 
00309     RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(remote_interface);
00310     if (local_interface)
00311     {
00312         PortInterface* other_port = local_interface->getPort(remote_port);
00313         if (!other_port)
00314             return false;
00315 
00316         // Try to disconnect the local port. However, one might have forced
00317         // having a CORBA connection between local ports, so if it fails go on
00318         // with normal CORBA disconnection
00319         if (port->disconnect(other_port))
00320             return true;
00321 
00322     }
00323 
00324     CORBA_CHECK_THREAD();
00325     RemoteConnID rcid(remote_interface, remote_port);
00326     return port->removeConnection( &rcid );
00327 }
00328 
00329 ::CORBA::Boolean CDataFlowInterface_i::createStream( const char* port,
00330                                RTT::corba::CConnPolicy & policy) ACE_THROW_SPEC ((
00331                                       CORBA::SystemException
00332                                       ,::RTT::corba::CNoSuchPortException
00333                                     ))
00334 {
00335     PortInterface* p = mdf->getPort(port);
00336     if (p == 0) {
00337         log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
00338         throw corba::CNoSuchPortException();
00339     }
00340 
00341     CORBA_CHECK_THREAD();
00342     RTT::ConnPolicy p2 = toRTT(policy);
00343     if ( p->createStream( p2 ) ) {
00344         policy = toCORBA(p2);
00345         return true;
00346     }
00347     return false;
00348 }
00349 
00350 void CDataFlowInterface_i::removeStream( const char* port, const char* stream_name) ACE_THROW_SPEC ((
00351               CORBA::SystemException
00352               ,::RTT::corba::CNoSuchPortException
00353             ))
00354 {
00355     PortInterface* p = mdf->getPort(port);
00356     if (p == 0) {
00357         log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
00358         throw corba::CNoSuchPortException();
00359     }
00360     CORBA_CHECK_THREAD();
00361     StreamConnID rcid(stream_name);
00362     p->removeConnection( &rcid );
00363 }
00364 
00365 
00366 CChannelElement_ptr CDataFlowInterface_i::buildChannelOutput(
00367         const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
00368                       CORBA::SystemException
00369                       ,::RTT::corba::CNoCorbaTransport
00370                   ,::RTT::corba::CNoSuchPortException
00371                     ))
00372 {
00373     Logger::In in("CDataFlowInterface_i::buildChannelOutput");
00374     InputPortInterface* port = dynamic_cast<InputPortInterface*>(mdf->getPort(port_name));
00375     if (port == 0)
00376         throw CNoSuchPortException();
00377 
00378     TypeInfo const* type_info = port->getTypeInfo();
00379     if (!type_info)
00380         throw CNoCorbaTransport();
00381 
00382     CorbaTypeTransporter* transporter =
00383         dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
00384     if (!transporter)
00385         throw CNoCorbaTransport();
00386 
00387     CORBA_CHECK_THREAD();
00388     // Convert to RTT policy.
00389     ConnPolicy policy2 = toRTT(corba_policy);
00390 
00391     ChannelElementBase::shared_ptr end = type_info->buildChannelOutput(*port);
00392     CRemoteChannelElement_i* this_element =
00393         transporter->createChannelElement_i(mdf, mpoa, corba_policy.pull);
00394     this_element->setCDataFlowInterface(this);
00395 
00396     /*
00397      * This part is for out-of band (needs to be factored out).
00398      */
00399     if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
00400         // prepare out-of-band transport for this port.
00401         // if user supplied name, use that one.
00402         if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
00403             log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
00404             log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
00405             return RTT::corba::CChannelElement::_nil();
00406         }
00407         RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, false);
00408         // if no user supplied name, pass on the new name.
00409         if ( strlen( corba_policy.name_id.in()) == 0 )
00410             corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
00411 
00412         if (ceb) {
00413             // override, insert oob element between corba and endpoint and add a buffer between oob and endpoint.
00414             dynamic_cast<ChannelElementBase*>(this_element)->setOutput(ceb);
00415             ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00416             ceb->setOutput( buf );
00417             buf->setOutput(end);
00418             log(Info) <<"Receiving data for port "<< policy2.name_id << " from out-of-band protocol "<< corba_policy.transport <<endlog();
00419         } else {
00420             log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
00421             return RTT::corba::CChannelElement::_nil();
00422         }
00423         //
00424     } else {
00425         // No OOB. omit buffer if in pull
00426         if ( !corba_policy.pull ) {
00427             ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00428             dynamic_cast<ChannelElementBase*>(this_element)->setOutput(buf);
00429             buf->setOutput(end);
00430         } else {
00431             dynamic_cast<ChannelElementBase*>(this_element)->setOutput(end);
00432         }
00433     }
00434 
00435     this_element->_remove_ref();
00436 
00437     // store our mapping of corba channel elements to C++ channel elements. We need this for channelReady() and removing a channel again.
00438     { RTT::os::MutexLock lock(channel_list_mtx);
00439         channel_list.push_back( ChannelList::value_type(this_element->_this(), end->getOutputEndPoint()));
00440     }
00441 
00442     CRemoteChannelElement_var proxy = this_element->_this();
00443     return proxy._retn();
00444 }
00445 
00449 CChannelElement_ptr CDataFlowInterface_i::buildChannelInput(
00450         const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
00451                       CORBA::SystemException
00452                       ,::RTT::corba::CNoCorbaTransport
00453                       ,::RTT::corba::CNoSuchPortException
00454                     ))
00455 {
00456     Logger::In in("CDataFlowInterface_i::buildChannelInput");
00457     // First check validity of user input...
00458     OutputPortInterface* port = dynamic_cast<OutputPortInterface*>(mdf->getPort(port_name));
00459     if (port == 0)
00460         throw CNoSuchPortException();
00461 
00462     TypeInfo const* type_info = port->getTypeInfo();
00463     if (!type_info)
00464         throw CNoCorbaTransport();
00465 
00466     CorbaTypeTransporter* transporter =
00467         dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
00468     if (!transporter)
00469         throw CNoCorbaTransport();
00470 
00471     CORBA_CHECK_THREAD();
00472     // Convert to RTT policy.
00473     ConnPolicy policy2 = toRTT(corba_policy);
00474 
00475     // Now create the output-side channel elements.
00476     ChannelElementBase::shared_ptr start = type_info->buildChannelInput(*port);
00477 
00478     // The channel element that exposes our channel in CORBA
00479     CRemoteChannelElement_i* this_element;
00480     PortableServer::ServantBase_var servant = this_element = transporter->createChannelElement_i(mdf, mpoa, corba_policy.pull);
00481     this_element->setCDataFlowInterface(this);
00482 
00483     // Attach the corba channel element first (so OOB is after corba).
00484     assert( dynamic_cast<ChannelElementBase*>(this_element) );
00485     start->getOutputEndPoint()->setOutput( dynamic_cast<ChannelElementBase*>(this_element));
00486 
00487     /*
00488      * This part if for out-of band. (needs to be factored out).
00489      */
00490     if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
00491         // prepare out-of-band transport for this port.
00492         // if user supplied name, use that one.
00493         if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
00494             log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
00495             log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
00496             throw CNoCorbaTransport();
00497         }
00498         RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, true);
00499         // if no user supplied name, pass on the new name.
00500         if ( strlen( corba_policy.name_id.in()) == 0 )
00501             corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
00502 
00503         if (ceb) {
00504             // OOB is added to end of chain.
00505             start->getOutputEndPoint()->setOutput( ceb );
00506             log(Info) <<"Sending data from port "<< policy2.name_id << " to out-of-band protocol "<< corba_policy.transport <<endlog();
00507         } else {
00508             log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
00509             throw CNoCorbaTransport();
00510         }
00511     } else {
00512         // No OOB. Always add output buffer.
00513         ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00514         start->setOutput(buf);
00515         buf->setOutput( dynamic_cast<ChannelElementBase*>(this_element) );
00516     }
00517 
00518 
00519     // Attach to our output port:
00520     port->addConnection( new SimpleConnID(), start->getInputEndPoint(), policy2);
00521 
00522     // DO NOT DO THIS: _remove_ref() is tied to the refcount of the ChannelElementBase !
00523     //this_element->_remove_ref();
00524 
00525     // Finally, store our mapping of corba channel elements to C++ channel elements. We need this for channelReady() and removing a channel again.
00526     { RTT::os::MutexLock lock(channel_list_mtx);
00527         channel_list.push_back( ChannelList::value_type(this_element->_this(), start->getInputEndPoint()));
00528     }
00529 
00530     return this_element->_this();
00531 }
00532 
00533 
00534 ::CORBA::Boolean CDataFlowInterface_i::createConnection(
00535         const char* writer_port, CDataFlowInterface_ptr reader_interface,
00536         const char* reader_port, CConnPolicy & policy) ACE_THROW_SPEC ((
00537                       CORBA::SystemException
00538                       ,::RTT::corba::CNoSuchPortException
00539                     ))
00540 {
00541     Logger::In in("CDataFlowInterface_i::createConnection");
00542     OutputPortInterface* writer = dynamic_cast<OutputPortInterface*>(mdf->getPort(writer_port));
00543     if (writer == 0)
00544         throw CNoSuchPortException();
00545 
00546     CORBA_CHECK_THREAD();
00547     // Check if +reader_interface+ is local. If it is, use the non-CORBA
00548     // connection.
00549     RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(reader_interface);
00550     if (local_interface && policy.transport == 0)
00551     {
00552         InputPortInterface* reader =
00553             dynamic_cast<InputPortInterface*>(local_interface->getPort(reader_port));
00554         if (!reader)
00555         {
00556             log(Warning) << "CORBA: createConnection() target is not an input port" << endlog();
00557             throw CNoSuchPortException();
00558             return false;
00559         }
00560 
00561         log(Debug) << "CORBA: createConnection() is creating a LOCAL connection between " <<
00562            writer_port << " and " << reader_port << endlog();
00563         return writer->createConnection(*reader, toRTT(policy));
00564     }
00565     else
00566         log(Debug) << "CORBA: createConnection() is creating a REMOTE connection between " <<
00567            writer_port << " and " << reader_port << endlog();
00568 
00569     try {
00570         if (reader_interface->getPortType(reader_port) != corba::CInput) {
00571             log(Error) << "Could not create connection: " << reader_port <<" is not an input port."<<endlog();
00572             throw CNoSuchPortException();
00573             return false;
00574         }
00575 
00576         // Creates a proxy to the remote input port
00577         RemoteInputPort port(writer->getTypeInfo(), reader_interface, reader_port, mpoa);
00578         port.setInterface( mdf ); // cheating !
00579         // Connect to proxy.
00580         return writer->createConnection(port, toRTT(policy));
00581     }
00582     catch(CORBA::COMM_FAILURE&) { throw; }
00583     catch(CORBA::TRANSIENT&) { throw; }
00584     catch(...) { throw; }
00585     return false;
00586 }
00587 
00588 // standard constructor
00589 CRemoteChannelElement_i::CRemoteChannelElement_i(RTT::corba::CorbaTypeTransporter const& transport,
00590           PortableServer::POA_ptr poa)
00591     : transport(transport)
00592     , mpoa(PortableServer::POA::_duplicate(poa))
00593     , mdataflow(0)
00594     { }
00595 CRemoteChannelElement_i::~CRemoteChannelElement_i() {}
00596 PortableServer::POA_ptr CRemoteChannelElement_i::_default_POA()
00597 { return PortableServer::POA::_duplicate(mpoa); }
00598 void CRemoteChannelElement_i::setRemoteSide(CRemoteChannelElement_ptr remote) ACE_THROW_SPEC ((
00599               CORBA::SystemException
00600             ))
00601 { this->remote_side = RTT::corba::CRemoteChannelElement::_duplicate(remote); }
00602 


rtt
Author(s): RTT Developers
autogenerated on Thu Jan 2 2014 11:35:20