20 from omniORB
import any
21 from omniORB
import CORBA
74 OpenRTM_aist.CorbaConsumer.__init__(self)
75 self.
_rtcout = OpenRTM_aist.Manager.instance().getLogbuf(
"InPortCorbaCdrConsumer")
92 def __del__(self, CorbaConsumer=OpenRTM_aist.CorbaConsumer):
93 self.
_rtcout.RTC_PARANOID(
"~InPortCorbaCdrConsumer()")
94 CorbaConsumer.__del__(self)
113 self.
_rtcout.RTC_TRACE(
"init()")
151 self.
_rtcout.RTC_PARANOID(
"put()")
156 inportcdr = ref_._narrow(OpenRTM.InPortCdr)
160 self.
_rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
216 self.
_rtcout.RTC_TRACE(
"subscribeInterface()")
248 self.
_rtcout.RTC_TRACE(
"unsubscribeInterface()")
272 self.
_rtcout.RTC_TRACE(
"subscribeFromIor()")
275 "dataport.corba_cdr.inport_ior")
277 self.
_rtcout.RTC_ERROR(
"inport_ior not found")
282 ior = any.from_any(properties[index].value, keep_structs=
True)
284 self.
_rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
287 self.
_rtcout.RTC_ERROR(
"inport_ior has no string")
290 orb = OpenRTM_aist.Manager.instance().getORB()
291 obj = orb.string_to_object(ior)
293 if CORBA.is_nil(obj):
294 self.
_rtcout.RTC_ERROR(
"invalid IOR string has been passed")
298 self.
_rtcout.RTC_WARN(
"Setting object to consumer failed.")
318 self.
_rtcout.RTC_TRACE(
"subscribeFromRef()")
320 "dataport.corba_cdr.inport_ref")
322 self.
_rtcout.RTC_ERROR(
"inport_ref not found")
327 obj = any.from_any(properties[index].value, keep_structs=
True)
329 self.
_rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
332 self.
_rtcout.RTC_ERROR(
"prop[inport_ref] is not objref")
335 if CORBA.is_nil(obj):
336 self.
_rtcout.RTC_ERROR(
"prop[inport_ref] is not objref")
340 self.
_rtcout.RTC_ERROR(
"Setting object to consumer failed.")
360 self.
_rtcout.RTC_TRACE(
"unsubscribeFromIor()")
362 "dataport.corba_cdr.inport_ior")
364 self.
_rtcout.RTC_ERROR(
"inport_ior not found")
369 ior = any.from_any(properties[index].value, keep_structs=
True)
371 self.
_rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
374 self.
_rtcout.RTC_ERROR(
"prop[inport_ior] is not string")
377 orb = OpenRTM_aist.Manager.instance().getORB()
378 var = orb.string_to_object(ior)
379 if not self.
_ptr()._is_equivalent(var):
380 self.
_rtcout.RTC_ERROR(
"connector property inconsistency")
401 self.
_rtcout.RTC_TRACE(
"unsubscribeFromRef()")
403 "dataport.corba_cdr.inport_ref")
410 obj = any.from_any(properties[index].value, keep_structs=
True)
412 self.
_rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
417 if not self.
_ptr()._is_equivalent(obj):
432 if ret == OpenRTM.PORT_OK:
435 elif ret == OpenRTM.PORT_ERROR:
438 elif ret == OpenRTM.BUFFER_FULL:
441 elif ret == OpenRTM.BUFFER_TIMEOUT:
444 elif ret == OpenRTM.UNKNOWN_ERROR:
452 factory = OpenRTM_aist.InPortConsumerFactory.instance()
453 factory.addFactory(
"corba_cdr",
def convertReturnCode(self, ret)
Return codes conversionReturnCode convertReturnCode(OpenRTM::PortStatus ret)
def getObject(self)
Get CORBA Object.
def subscribeFromRef(self, properties)
Getting object reference fromn Any directry.
def init(self, prop)
Initializing configuration.
def publishInterfaceProfile(self, properties)
Publish InterfaceProfile information.
def __init__(self)
Constructor.
def subscribeInterface(self, properties)
Subscribe to the data sending notification.
def unsubscribeFromRef(self, properties)
ubsubscribing (Object reference version)
def InPortCorbaCdrConsumerInit()
def setObject(self, obj)
Set CORBA Object.
def put(self, data)
Send data to the destination port.
int PORT_OK
DataPortStatus return codes.
InPortCorbaCdrConsumer class.
def __del__(self, CorbaConsumer=OpenRTM_aist.CorbaConsumer)
Destructor.
def unsubscribeFromIor(self, properties)
ubsubscribing (IOR version)
def subscribeFromIor(self, properties)
Getting object reference fromn IOR string.
def _ptr(self)
Get Object reference narrowed as ObjectType.
def unsubscribeInterface(self, properties)
Unsubscribe the data send notification.