00001
00020 #include <assert.h>
00021 #include <memory>
00022 #include <coil/UUID.h>
00023 #include <rtm/PortBase.h>
00024 #include <rtm/PortCallback.h>
00025
00026 namespace RTC
00027 {
00028
00029
00030
00038 PortBase::PortBase(const char* name)
00039 : rtclog(name),
00040 m_ownerInstanceName("unknown"),
00041 m_connectionLimit(-1),
00042 m_onPublishInterfaces(0),
00043 m_onSubscribeInterfaces(0),
00044 m_onConnected(0),
00045 m_onUnsubscribeInterfaces(0),
00046 m_onDisconnected(0),
00047 m_onConnectionLost(0),
00048 m_portconnListeners(NULL)
00049 {
00050 m_objref = this->_this();
00051
00052 std::string portname(m_ownerInstanceName);
00053 portname += ".";
00054 portname += name;
00055
00056 m_profile.name = CORBA::string_dup(portname.c_str());
00057 m_profile.interfaces.length(0);
00058 m_profile.port_ref = m_objref;
00059 m_profile.connector_profiles.length(0);
00060 m_profile.owner = RTC::RTObject::_nil();
00061 m_profile.properties.length(0);
00062 }
00063
00071 PortBase::~PortBase()
00072 {
00073 RTC_TRACE(("~PortBase()"));
00074 try
00075 {
00076 PortableServer::ObjectId_var oid = _default_POA()->servant_to_id(this);
00077 _default_POA()->deactivate_object(oid);
00078 }
00079 catch (PortableServer::POA::ServantNotActive &e)
00080 {
00081 RTC_ERROR(("%s", e._name()));
00082 }
00083 catch (PortableServer::POA::WrongPolicy &e)
00084 {
00085 RTC_ERROR(("%s", e._name()));
00086 }
00087 catch (...)
00088 {
00089 RTC_ERROR(("Unknown exception caught."));
00090 }
00091 }
00092
00100 PortProfile* PortBase::get_port_profile()
00101 throw (CORBA::SystemException)
00102 {
00103 RTC_TRACE(("get_port_profile()"));
00104
00105 updateConnectors();
00106 Guard gaurd(m_profile_mutex);
00107 PortProfile_var prof;
00108 prof = new PortProfile(m_profile);
00109 return prof._retn();
00110 }
00111
00119 const PortProfile& PortBase::getPortProfile() const
00120 {
00121 RTC_TRACE(("getPortProfile()"));
00122
00123 return m_profile;
00124 }
00125
00133 ConnectorProfileList* PortBase::get_connector_profiles()
00134 throw (CORBA::SystemException)
00135 {
00136 RTC_TRACE(("get_connector_profiles()"));
00137
00138 updateConnectors();
00139
00140 Guard gaurd(m_profile_mutex);
00141 ConnectorProfileList_var conn_prof;
00142 conn_prof = new ConnectorProfileList(m_profile.connector_profiles);
00143 return conn_prof._retn();
00144 }
00145
00153 ConnectorProfile* PortBase::get_connector_profile(const char* connector_id)
00154 throw (CORBA::SystemException)
00155 {
00156 RTC_TRACE(("get_connector_profile(%s)", connector_id));
00157
00158 updateConnectors();
00159
00160 Guard gaurd(m_profile_mutex);
00161 CORBA::Long index(findConnProfileIndex(connector_id));
00162
00163 if (index < 0)
00164 {
00165 ConnectorProfile_var conn_prof;
00166 conn_prof = new ConnectorProfile();
00167 return conn_prof._retn();
00168 }
00169 ConnectorProfile_var conn_prof;
00170 conn_prof = new ConnectorProfile(m_profile.connector_profiles[index]);
00171 return conn_prof._retn();
00172 }
00173
00181 ReturnCode_t PortBase::connect(ConnectorProfile& connector_profile)
00182 throw (CORBA::SystemException)
00183 {
00184 RTC_TRACE(("connect()"));
00185 if (isEmptyId(connector_profile))
00186 {
00187 Guard gurad(m_profile_mutex);
00188
00189
00190 setUUID(connector_profile);
00191 assert(!isExistingConnId(connector_profile.connector_id));
00192 }
00193 else
00194 {
00195 Guard gurad(m_profile_mutex);
00196 if (isExistingConnId(connector_profile.connector_id))
00197 {
00198 RTC_ERROR(("Connection already exists."));
00199 return RTC::PRECONDITION_NOT_MET;
00200 }
00201 }
00202
00203 try
00204 {
00205 RTC::PortService_ptr p;
00206 p = connector_profile.ports[(CORBA::ULong)0];
00207 ReturnCode_t ret = p->notify_connect(connector_profile);
00208 if (ret != RTC::RTC_OK)
00209 {
00210 RTC_ERROR(("Connection failed. cleanup."));
00211 disconnect(connector_profile.connector_id);
00212 }
00213 return ret;
00214 }
00215 catch (...)
00216 {
00217 return RTC::BAD_PARAMETER;
00218 }
00219 return RTC::RTC_ERROR;
00220 }
00221
00229 ReturnCode_t PortBase::notify_connect(ConnectorProfile& connector_profile)
00230 throw (CORBA::SystemException)
00231 {
00232 RTC_TRACE(("notify_connect()"));
00233 Guard guard(m_connectorsMutex);
00234 ReturnCode_t retval[] = {RTC::RTC_OK, RTC::RTC_OK, RTC::RTC_OK};
00235
00236 onNotifyConnect(getName(), connector_profile);
00237
00238
00239 retval[0] = publishInterfaces(connector_profile);
00240 if (retval[0] != RTC::RTC_OK)
00241 {
00242 RTC_ERROR(("publishInterfaces() in notify_connect() failed."));
00243 }
00244 onPublishInterfaces(getName(), connector_profile, retval[0]);
00245 if (m_onPublishInterfaces != 0)
00246 {
00247 (*m_onPublishInterfaces)(connector_profile);
00248 }
00249
00250
00251
00252 retval[1] = connectNext(connector_profile);
00253 if (retval[1] != RTC::RTC_OK)
00254 {
00255 RTC_ERROR(("connectNext() in notify_connect() failed."));
00256 }
00257 onConnectNextport(getName(), connector_profile, retval[1]);
00258
00259
00260
00261 if (m_onSubscribeInterfaces != 0)
00262 {
00263 (*m_onSubscribeInterfaces)(connector_profile);
00264 }
00265
00266 retval[2] = subscribeInterfaces(connector_profile);
00267 if (retval[2] != RTC::RTC_OK)
00268 {
00269 RTC_ERROR(("subscribeInterfaces() in notify_connect() failed."));
00270 }
00271 onSubscribeInterfaces(getName(), connector_profile, retval[2]);
00272
00273 RTC_PARANOID(("%d connectors are existing",
00274 m_profile.connector_profiles.length()));
00275
00276 Guard gurad(m_profile_mutex);
00277 CORBA::Long index(findConnProfileIndex(connector_profile.connector_id));
00278 if (index < 0)
00279 {
00280 CORBA_SeqUtil::push_back(m_profile.connector_profiles,
00281 connector_profile);
00282 RTC_PARANOID(("New connector_id. Push backed."));
00283 }
00284 else
00285 {
00286 m_profile.connector_profiles[index] = connector_profile;
00287 RTC_PARANOID(("Existing connector_id. Updated."));
00288 }
00289
00290 for (int i(0), len(sizeof(retval)/sizeof(ReturnCode_t)); i < len; ++i)
00291 {
00292 if (retval[i] != RTC::RTC_OK)
00293 {
00294 onConnected(getName(), connector_profile, retval[i]);
00295 return retval[i];
00296 }
00297 }
00298
00299
00300 if (m_onConnected != 0)
00301 {
00302 (*m_onConnected)(connector_profile);
00303 }
00304 onConnected(getName(), connector_profile, RTC::RTC_OK);
00305 return RTC::RTC_OK;
00306 }
00307
00315 ReturnCode_t PortBase::_publishInterfaces(void)
00316 {
00317 if(!(m_connectionLimit < 0))
00318 {
00319 if((::CORBA::ULong)m_connectionLimit<=m_profile.connector_profiles.length())
00320 {
00321 RTC_PARANOID(("Connected number has reached the limitation."));
00322 RTC_PARANOID(("Can connect the port up to %d ports.",
00323 m_connectionLimit));
00324 RTC_PARANOID(("%d connectors are existing",
00325 m_profile.connector_profiles.length()));
00326 return RTC::RTC_ERROR;
00327 }
00328 }
00329 return RTC::RTC_OK;
00330 }
00331
00339 ReturnCode_t PortBase::disconnect(const char* connector_id)
00340 throw (CORBA::SystemException)
00341 {
00342 RTC_TRACE(("disconnect(%s)", connector_id));
00343
00344 CORBA::Long index(findConnProfileIndex(connector_id));
00345 if (index < 0)
00346 {
00347 RTC_ERROR(("Invalid connector id: %s", connector_id));
00348 return RTC::BAD_PARAMETER;
00349 }
00350
00351 ConnectorProfile prof;
00352 {
00353 Guard guard(m_profile_mutex);
00354 prof = m_profile.connector_profiles[index];
00355 }
00356
00357 if (prof.ports.length() < 1)
00358 {
00359 RTC_FATAL(("ConnectorProfile has empty port list."));
00360 return RTC::PRECONDITION_NOT_MET;
00361 }
00362
00363 for (CORBA::ULong i(0), len(prof.ports.length()); i < len; ++i)
00364 {
00365 RTC::PortService_var p(prof.ports[i]);
00366 try
00367 {
00368 return p->notify_disconnect(connector_id);
00369 }
00370 catch (CORBA::SystemException &e)
00371 {
00372 #ifndef ORB_IS_RTORB
00373 RTC_WARN(("Exception caught: minor code(%d).", e.minor()));;
00374 #else // ORB_IS_RTORB
00375 RTC_WARN(("Exception caught"));
00376 #endif // ORB_IS_RTORB
00377 continue;
00378 }
00379 catch (...)
00380 {
00381 RTC_WARN(("Unknown exception caught."));;
00382 continue;
00383 }
00384 }
00385 RTC_ERROR(("notify_disconnect() for all ports failed."));
00386 return RTC::RTC_ERROR;
00387 }
00388
00396 ReturnCode_t PortBase::notify_disconnect(const char* connector_id)
00397 throw (CORBA::SystemException)
00398 {
00399 RTC_TRACE(("notify_disconnect(%s)", connector_id));
00400 Guard guard(m_connectorsMutex);
00401 Guard gaurd(m_profile_mutex);
00402
00403
00404 CORBA::Long index(findConnProfileIndex(connector_id));
00405 if (index < 0)
00406 {
00407 RTC_ERROR(("Invalid connector id: %s", connector_id));
00408 return RTC::BAD_PARAMETER;
00409 }
00410
00411 ConnectorProfile& prof(m_profile.connector_profiles[(CORBA::ULong)index]);
00412 onNotifyDisconnect(getName(), prof);
00413
00414 ReturnCode_t retval(disconnectNext(prof));
00415 onDisconnectNextport(getName(), prof, retval);
00416
00417 if (m_onUnsubscribeInterfaces != 0)
00418 {
00419 (*m_onUnsubscribeInterfaces)(prof);
00420 }
00421 onUnsubscribeInterfaces(getName(), prof);
00422 unsubscribeInterfaces(prof);
00423
00424 if (m_onDisconnected != 0)
00425 {
00426 (*m_onDisconnected)(prof);
00427 }
00428
00429 #ifndef ORB_IS_RTORB
00430 CORBA_SeqUtil::erase(m_profile.connector_profiles, index);
00431 #else // ORB_IS_RTORB
00432 CORBA::ULong len(m_profile.connector_profiles.length());
00433 if (index < (CORBA::Long)len)
00434 {
00435 for (CORBA::ULong i(index); i < len - 1; ++i)
00436 {
00437 m_profile.connector_profiles[i] =
00438 m_profile.connector_profiles[i + 1] ;
00439 }
00440 m_profile.connector_profiles._length=len-1;
00441 }
00442 #endif // ORB_IS_RTORB
00443 onDisconnected(getName(), prof, retval);
00444 return retval;
00445 }
00446
00454 ReturnCode_t PortBase::disconnect_all()
00455 throw (CORBA::SystemException)
00456 {
00457 RTC_TRACE(("disconnect_all()"));
00458
00459 ::RTC::ConnectorProfileList plist;
00460 {
00461 Guard gaurd(m_profile_mutex);
00462 plist = m_profile.connector_profiles;
00463 }
00464
00465 RTC::ReturnCode_t retcode(::RTC::RTC_OK);
00466 CORBA::ULong len(plist.length());
00467 RTC_DEBUG(("disconnecting %d connections.", len));
00468 for (CORBA::ULong i(0); i < len; ++i)
00469 {
00470 ReturnCode_t tmpret;
00471 tmpret =this->disconnect(plist[i].connector_id);
00472 if (tmpret != RTC::RTC_OK) retcode = tmpret;
00473 }
00474
00475 return retcode;
00476 }
00477
00478
00479
00480
00488 void PortBase::setName(const char* name)
00489 {
00490 RTC_TRACE(("setName(%s)", name));
00491 Guard guard(m_profile_mutex);
00492 m_profile.name = CORBA::string_dup(name);
00493 rtclog.setName(name);
00494 }
00495
00504 const char* PortBase::getName() const
00505 {
00506 RTC_TRACE(("getName() = %s", (const char*)m_profile.name));
00507 return m_profile.name;
00508 }
00509
00517 const PortProfile& PortBase::getProfile() const
00518 {
00519 RTC_TRACE(("getProfile()"));
00520 Guard guard(m_profile_mutex);
00521 return m_profile;
00522 }
00523
00531 void PortBase::setPortRef(PortService_ptr port_ref)
00532 {
00533 RTC_TRACE(("setPortRef()"));
00534 Guard gurad(m_profile_mutex);
00535 m_profile.port_ref = port_ref;
00536 }
00537
00545 PortService_ptr PortBase::getPortRef()
00546 {
00547 RTC_TRACE(("getPortRef()"));
00548 Guard gurad(m_profile_mutex);
00549 return m_profile.port_ref;
00550 }
00551
00559 void PortBase::setOwner(RTObject_ptr owner)
00560 {
00561 RTC::ComponentProfile_var prof = owner->get_component_profile();
00562
00563 m_ownerInstanceName = prof->instance_name;
00564 RTC_TRACE(("setOwner(%s)", m_ownerInstanceName.c_str()));
00565
00566 {
00567 Guard gurad(m_profile_mutex);
00568 std::string portname((const char*)m_profile.name);
00569 coil::vstring p(coil::split(portname, "."));
00570
00571 portname = m_ownerInstanceName +"."+ p.back();
00572
00573 m_profile.owner = RTC::RTObject::_duplicate(owner);
00574 m_profile.name = CORBA::string_dup(portname.c_str());
00575 }
00576 }
00577
00578
00579 void PortBase::setOnPublishInterfaces(ConnectionCallback* on_publish)
00580 {
00581 m_onPublishInterfaces = on_publish;
00582 }
00583
00584 void PortBase::setOnSubscribeInterfaces(ConnectionCallback* on_subscribe)
00585 {
00586 m_onSubscribeInterfaces = on_subscribe;
00587 }
00588
00589 void PortBase::setOnConnected(ConnectionCallback* on_connected)
00590 {
00591 m_onConnected = on_connected;
00592 }
00593
00594 void PortBase::setOnUnsubscribeInterfaces(ConnectionCallback* on_unsubscribe)
00595 {
00596 m_onUnsubscribeInterfaces = on_unsubscribe;
00597 }
00598
00599 void PortBase::setOnDisconnected(ConnectionCallback* on_disconnected)
00600 {
00601 m_onDisconnected = on_disconnected;
00602 }
00603
00604 void PortBase::setOnConnectionLost(ConnectionCallback* on_connection_lost)
00605 {
00606 m_onConnectionLost = on_connection_lost;
00607 }
00608
00609 void PortBase::
00610 setPortConnectListenerHolder(PortConnectListeners* portconnListeners)
00611 {
00612 m_portconnListeners = portconnListeners;
00613 }
00614
00615
00616
00617
00618
00626 ReturnCode_t PortBase::connectNext(ConnectorProfile& connector_profile)
00627 {
00628 CORBA::Long index;
00629 index = CORBA_SeqUtil::find(connector_profile.ports,
00630 find_port_ref(m_profile.port_ref));
00631
00632 if (index < 0) return RTC::BAD_PARAMETER;
00633
00634 if (++index < static_cast<CORBA::Long>(connector_profile.ports.length()))
00635 {
00636 RTC::PortService_ptr p;
00637 p = connector_profile.ports[index];
00638 return p->notify_connect(connector_profile);
00639 }
00640 return RTC::RTC_OK;
00641 }
00642
00650 ReturnCode_t PortBase::disconnectNext(ConnectorProfile& cprof)
00651 {
00652 CORBA::ULong index;
00653 index = CORBA_SeqUtil::find(cprof.ports,
00654 find_port_ref(m_profile.port_ref));
00655 if (index < 0)
00656 {
00657 return RTC::BAD_PARAMETER;
00658 }
00659 if (index == cprof.ports.length() - 1)
00660 {
00661 return RTC::RTC_OK;
00662 }
00663
00664 CORBA::ULong len = cprof.ports.length();
00665
00666 ++index;
00667 for (CORBA::ULong i(index); i < len; ++i)
00668 {
00669 RTC::PortService_var p;
00670 p = cprof.ports[i];
00671 try
00672 {
00673 return p->notify_disconnect(cprof.connector_id);
00674 }
00675 catch (CORBA::SystemException& e)
00676 {
00677 #ifndef ORB_IS_RTORB
00678 RTC_WARN(("Exception caught: minor code.", e.minor()));
00679 #else // ORB_IS_RTORB
00680 RTC_WARN(("Exception caught"));
00681 #endif // ORB_IS_RTORB
00682 continue;
00683 }
00684 catch (...)
00685 {
00686 RTC_WARN(("Unknown exception caught."));
00687 continue;
00688 }
00689 }
00690
00691 return RTC::RTC_ERROR;
00692 }
00700 void PortBase::setConnectionLimit(int limit_value)
00701 {
00702 m_connectionLimit = limit_value;
00703 }
00704
00705
00706
00707
00715 bool PortBase::isEmptyId(const ConnectorProfile& connector_profile) const
00716 {
00717 return connector_profile.connector_id[(CORBA::ULong)0] == 0;
00718 }
00719
00720
00728 const std::string PortBase::getUUID() const
00729 {
00730 coil::UUID_Generator uugen;
00731 uugen.init();
00732 std::auto_ptr<coil::UUID> uuid(uugen.generateUUID(2,0x01));
00733
00734 return std::string((const char*)uuid->to_string());
00735 }
00736
00744 void PortBase::setUUID(ConnectorProfile& connector_profile) const
00745 {
00746 connector_profile.connector_id = CORBA::string_dup(getUUID().c_str());
00747 assert(connector_profile.connector_id[(CORBA::ULong)0] != 0);
00748 }
00749
00757 bool PortBase::isExistingConnId(const char* id)
00758 {
00759 return CORBA_SeqUtil::find(m_profile.connector_profiles,
00760 find_conn_id(id)) >= 0;
00761 }
00762
00770 ConnectorProfile PortBase::findConnProfile(const char* id)
00771 {
00772 CORBA::Long index;
00773 index = CORBA_SeqUtil::find(m_profile.connector_profiles,
00774 find_conn_id(id));
00775 return m_profile.connector_profiles[index];
00776 }
00777
00785 CORBA::Long PortBase::findConnProfileIndex(const char* id)
00786 {
00787 return CORBA_SeqUtil::find(m_profile.connector_profiles,
00788 find_conn_id(id));
00789 }
00790
00798 void
00799 PortBase::updateConnectorProfile(const ConnectorProfile& connector_profile)
00800 {
00801 CORBA::Long index;
00802 index = CORBA_SeqUtil::find(m_profile.connector_profiles,
00803 find_conn_id(connector_profile.connector_id));
00804
00805 if (index < 0)
00806 {
00807 CORBA_SeqUtil::push_back(m_profile.connector_profiles,
00808 connector_profile);
00809 }
00810 else
00811 {
00812 m_profile.connector_profiles[index] = connector_profile;
00813 }
00814 }
00815
00823 bool PortBase::eraseConnectorProfile(const char* id)
00824 {
00825 CORBA::Long index;
00826 index = CORBA_SeqUtil::find(m_profile.connector_profiles,
00827 find_conn_id(id));
00828 if (index < 0) return false;
00829
00830 CORBA_SeqUtil::erase(m_profile.connector_profiles, index);
00831 return true;
00832 }
00833
00841 bool PortBase::appendInterface(const char* instance_name,
00842 const char* type_name,
00843 PortInterfacePolarity pol)
00844 {
00845 CORBA::Long index;
00846 index = CORBA_SeqUtil::find(m_profile.interfaces,
00847 find_interface(instance_name, pol));
00848
00849 if (index >= 0) return false;
00850
00851 PortInterfaceProfile prof;
00852 prof.instance_name = CORBA::string_dup(instance_name);
00853 prof.type_name = CORBA::string_dup(type_name);
00854 prof.polarity = pol;
00855 CORBA_SeqUtil::push_back(m_profile.interfaces, prof);
00856
00857 return true;
00858 }
00859
00867 bool PortBase::deleteInterface(const char* name, PortInterfacePolarity pol)
00868 {
00869 CORBA::Long index;
00870 index = CORBA_SeqUtil::find(m_profile.interfaces,
00871 find_interface(name, pol));
00872
00873 if (index < 0) return false;
00874
00875 CORBA_SeqUtil::erase(m_profile.interfaces, index);
00876 return true;
00877 }
00878
00886 void PortBase::updateConnectors()
00887 {
00888 std::vector<std::string> connector_ids;
00889 {
00890
00891 #ifndef ORB_IS_RTORB
00892 Guard guard(m_profile_mutex);
00893 ConnectorProfileList& clist(m_profile.connector_profiles);
00894
00895 for (CORBA::ULong i(0); i < clist.length(); ++i)
00896 {
00897 if (!checkPorts(clist[i].ports))
00898 {
00899 const char* id(clist[i].connector_id);
00900 connector_ids.push_back(id);
00901 RTC_WARN(("Dead connection: %s", id));
00902 }
00903 }
00904 #else // ORB_IS_RTORB
00905 ConnectorProfileList* clist;
00906 clist = new ConnectorProfileList(m_profile.connector_profiles);
00907
00908 for (CORBA::ULong i(0); i < clist->length(); ++i)
00909 {
00910 if (!checkPorts((*clist)[i].ports))
00911 {
00912 const char* id((*clist)[i].connector_id);
00913 connector_ids.push_back(id);
00914 RTC_WARN(("Dead connection: %s", id));
00915 }
00916 }
00917 delete clist;
00918 #endif // ORB_IS_RTORB
00919 }
00920 std::vector<std::string>::iterator it, it_end;
00921
00922 for (std::vector<std::string>::iterator it(connector_ids.begin());
00923 it != connector_ids.end(); ++it)
00924 {
00925 this->disconnect((*it).c_str());
00926 }
00927 }
00928
00936 #ifndef ORB_IS_RTORB
00937 bool PortBase::checkPorts(::RTC::PortServiceList& ports)
00938 #else // ORB_IS_RTORB
00939 bool PortBase::checkPorts(RTC_PortServiceList& ports)
00940 #endif // ORB_IS_RTORB
00941 {
00942 for (CORBA::ULong i(0), len(ports.length()); i < len; ++i)
00943 {
00944 try
00945 {
00946 if (ports[i]->_non_existent())
00947 {
00948 RTC_WARN(("Dead Port reference detected."));
00949 return false;
00950 }
00951 }
00952 catch (...)
00953 {
00954 return false;
00955 }
00956 }
00957 return true;
00958 }
00959
00960 };