18 from omniORB
import CORBA
19 from omniORB
import any
623 OpenRTM_aist.PortBase.__init__(self, name)
631 def __del__(self, PortBase=OpenRTM_aist.PortBase):
632 PortBase.__del__(self)
664 self.
_rtcout.RTC_TRACE(
"init()")
669 if not OpenRTM_aist.stringTo([num],
670 self.
_properties.getProperty(
"connection_limit",
"-1")):
671 self.
_rtcout.RTC_ERROR(
"invalid connection_limit value: %s",
718 self.
_rtcout.RTC_TRACE(
"registerProvider(instance=%s, type_name=%s)",
719 (instance_name, type_name))
726 self.
_rtcout.RTC_ERROR(
"appending provider interface failed")
727 self.
_rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
781 self.
_rtcout.RTC_TRACE(
"registerConsumer()")
836 provider.deactivate()
926 self.
_rtcout.RTC_TRACE(
"publishInterfaces()")
930 if returnvalue != RTC.RTC_OK:
940 newdesc +=
".provided." + provider.descriptor()
947 olddesc =
"port." + provider.descriptor()
1090 self.
_rtcout.RTC_TRACE(
"subscribeInterfaces()")
1091 nv = connector_profile.properties
1096 strictness = str(any.from_any(nv[index].value, keep_structs=
True))
1097 if "best_effort" == strictness:
1099 elif "strict" == strictness:
1102 self.
_rtcout.RTC_DEBUG(
"Connetion strictness is: %s",strictness)
1106 if self.
findProvider(nv, consumer, ior)
and len(ior) > 0:
1118 self.
_rtcout.RTC_ERROR(
"subscribeInterfaces() failed.")
1119 return RTC.RTC_ERROR
1121 self.
_rtcout.RTC_TRACE(
"subscribeInterfaces() successfully finished.")
1150 self.
_rtcout.RTC_TRACE(
"unsubscribeInterfaces()")
1151 nv = connector_profile.properties
1155 if self.
findProvider(nv, consumer, ior)
and len(ior) > 0:
1156 self.
_rtcout.RTC_DEBUG(
"Correspoinding consumer found.")
1162 self.
_rtcout.RTC_DEBUG(
"Correspoinding consumer found.")
1206 newdesc +=
".required." + cons.descriptor()
1213 provider = str(any.from_any(nv[cons_index].value, keep_structs=
True))
1215 self.
_rtcout.RTC_WARN(
"Cannot extract Provider interface descriptor")
1223 ior_ = str(any.from_any(nv[prov_index].value, keep_structs=
True))
1225 self.
_rtcout.RTC_WARN(
"Cannot extract Provider IOR string")
1228 if isinstance(iorstr, list):
1231 self.
_rtcout.RTC_ERROR(
"interface matched with new descriptor: %s", newdesc)
1275 olddesc =
"port." + cons.descriptor()
1282 ior_ = str(any.from_any(nv[index].value, keep_structs=
True))
1284 self.
_rtcout.RTC_WARN(
"Cannot extract Provider IOR string")
1287 if isinstance(iorstr, list):
1290 self.
_rtcout.RTC_ERROR(
"interface matched with old descriptor: %s", olddesc)
1332 if "IOR:" != ior[:4]:
1336 if not cons.setObject(ior):
1337 self.
_rtcout.RTC_ERROR(
"Cannot narrow reference")
1340 self.
_rtcout.RTC_TRACE(
"setObject() done")
1371 if ior == cons.getIor():
1372 cons.releaseObject()
1373 self.
_rtcout.RTC_DEBUG(
"Consumer %s released.", cons.descriptor())
1376 self.
_rtcout.RTC_WARN(
"IORs between Consumer and Connector are different.")
1401 _mgr = OpenRTM_aist.Manager.instance()
1404 obj = _mgr.getPOA().id_to_reference(self.
_oid)
1405 self.
_ior = _mgr.getORB().object_to_string(obj)
1431 OpenRTM_aist.Manager.instance().getPOA().activate_object_with_id(self.
_oid, self.
_servant)
1433 print(OpenRTM_aist.Logger.print_exception())
1439 OpenRTM_aist.Manager.instance().getPOA().deactivate_object(self.
_oid)
1441 print(OpenRTM_aist.Logger.print_exception())
1457 def __init__(self, type_name, instance_name, consumer, owner):
1487 orb = OpenRTM_aist.Manager.instance().getORB()
1488 obj = orb.string_to_object(ior)
1489 if CORBA.is_nil(obj):
1524 for i
in range(self.
_len):
1526 if self.
_cons[i].descriptor() == name_:
1528 obj = any.from_any(nv.value, keep_structs=
True)
1531 print(OpenRTM_aist.Logger.print_exception())
1547 for i
in range(self.
_len):
1549 if self.
_cons[i].descriptor() == name_:
1554 if "port."+self.
_cons[i].descriptor() == name_: