InPortCorbaCdrConsumer.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 
18 
19 import sys
20 from omniORB import any
21 from omniORB import CORBA
22 import OpenRTM_aist
23 import OpenRTM
24 
25 
53  """
54  """
55 
56 
73  def __init__(self):
74  OpenRTM_aist.CorbaConsumer.__init__(self)
75  self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("InPortCorbaCdrConsumer")
76  self._properties = None
77  return
78 
79 
92  def __del__(self, CorbaConsumer=OpenRTM_aist.CorbaConsumer):
93  self._rtcout.RTC_PARANOID("~InPortCorbaCdrConsumer()")
94  CorbaConsumer.__del__(self)
95  return
96 
97 
112  def init(self, prop):
113  self._rtcout.RTC_TRACE("init()")
114  self._properties = prop
115  return
116 
117 
150  def put(self, data):
151  self._rtcout.RTC_PARANOID("put()")
152 
153  try:
154  ref_ = self.getObject()
155  if ref_:
156  inportcdr = ref_._narrow(OpenRTM.InPortCdr)
157  return self.convertReturnCode(inportcdr.put(data))
158  return self.CONNECTION_LOST
159  except:
160  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
161  return self.CONNECTION_LOST
162 
163  return self.UNKNOWN_ERROR
164 
165 
189  def publishInterfaceProfile(self, properties):
190  return
191 
192 
215  def subscribeInterface(self, properties):
216  self._rtcout.RTC_TRACE("subscribeInterface()")
217  # self._rtcout.RTC_DEBUG_STR(OpenRTM_aist.NVUtil.toString(properties))
218 
219  # getting InPort's ref from IOR string
220  if self.subscribeFromIor(properties):
221  return True
222 
223  # getting InPort's ref from Object reference
224  if self.subscribeFromRef(properties):
225  return True
226 
227  return False
228 
229 
247  def unsubscribeInterface(self, properties):
248  self._rtcout.RTC_TRACE("unsubscribeInterface()")
249  # self._rtcout.RTC_DEBUG_STR(OpenRTM_aist.NVUtil.toString(properties))
250 
251  if self.unsubscribeFromIor(properties):
252  return
253 
254  self.unsubscribeFromRef(properties)
255  return
256 
257 
271  def subscribeFromIor(self, properties):
272  self._rtcout.RTC_TRACE("subscribeFromIor()")
273 
274  index = OpenRTM_aist.NVUtil.find_index(properties,
275  "dataport.corba_cdr.inport_ior")
276  if index < 0:
277  self._rtcout.RTC_ERROR("inport_ior not found")
278  return False
279 
280  ior = ""
281  try:
282  ior = any.from_any(properties[index].value, keep_structs=True)
283  except:
284  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
285 
286  if not ior:
287  self._rtcout.RTC_ERROR("inport_ior has no string")
288  return False
289 
290  orb = OpenRTM_aist.Manager.instance().getORB()
291  obj = orb.string_to_object(ior)
292 
293  if CORBA.is_nil(obj):
294  self._rtcout.RTC_ERROR("invalid IOR string has been passed")
295  return False
296 
297  if not self.setObject(obj):
298  self._rtcout.RTC_WARN("Setting object to consumer failed.")
299  return False
300 
301  return True
302 
303 
317  def subscribeFromRef(self, properties):
318  self._rtcout.RTC_TRACE("subscribeFromRef()")
319  index = OpenRTM_aist.NVUtil.find_index(properties,
320  "dataport.corba_cdr.inport_ref")
321  if index < 0:
322  self._rtcout.RTC_ERROR("inport_ref not found")
323  return False
324 
325  obj = None
326  try:
327  obj = any.from_any(properties[index].value, keep_structs=True)
328  except:
329  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
330 
331  if not obj:
332  self._rtcout.RTC_ERROR("prop[inport_ref] is not objref")
333  return False
334 
335  if CORBA.is_nil(obj):
336  self._rtcout.RTC_ERROR("prop[inport_ref] is not objref")
337  return False
338 
339  if not self.setObject(obj):
340  self._rtcout.RTC_ERROR("Setting object to consumer failed.")
341  return False
342 
343  return True
344 
345 
359  def unsubscribeFromIor(self, properties):
360  self._rtcout.RTC_TRACE("unsubscribeFromIor()")
361  index = OpenRTM_aist.NVUtil.find_index(properties,
362  "dataport.corba_cdr.inport_ior")
363  if index < 0:
364  self._rtcout.RTC_ERROR("inport_ior not found")
365  return False
366 
367  ior = ""
368  try:
369  ior = any.from_any(properties[index].value, keep_structs=True)
370  except:
371  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
372 
373  if not ior:
374  self._rtcout.RTC_ERROR("prop[inport_ior] is not string")
375  return False
376 
377  orb = OpenRTM_aist.Manager.instance().getORB()
378  var = orb.string_to_object(ior)
379  if not self._ptr()._is_equivalent(var):
380  self._rtcout.RTC_ERROR("connector property inconsistency")
381  return False
382 
383  self.releaseObject()
384  return True
385 
386 
400  def unsubscribeFromRef(self, properties):
401  self._rtcout.RTC_TRACE("unsubscribeFromRef()")
402  index = OpenRTM_aist.NVUtil.find_index(properties,
403  "dataport.corba_cdr.inport_ref")
404 
405  if index < 0:
406  return False
407 
408  obj = None
409  try:
410  obj = any.from_any(properties[index].value, keep_structs=True)
411  except:
412  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
413 
414  if not obj:
415  return False
416 
417  if not self._ptr()._is_equivalent(obj):
418  return False
419 
420  self.releaseObject()
421  return True
422 
423 
431  def convertReturnCode(self, ret):
432  if ret == OpenRTM.PORT_OK:
433  return self.PORT_OK
434 
435  elif ret == OpenRTM.PORT_ERROR:
436  return self.PORT_ERROR
437 
438  elif ret == OpenRTM.BUFFER_FULL:
439  return self.SEND_FULL
440 
441  elif ret == OpenRTM.BUFFER_TIMEOUT:
442  return self.SEND_TIMEOUT
443 
444  elif ret == OpenRTM.UNKNOWN_ERROR:
445  return self.UNKNOWN_ERROR
446 
447  else:
448  return self.UNKNOWN_ERROR
449 
450 
452  factory = OpenRTM_aist.InPortConsumerFactory.instance()
453  factory.addFactory("corba_cdr",
455  OpenRTM_aist.Delete)
def convertReturnCode(self, ret)
Return codes conversionReturnCode convertReturnCode(OpenRTM::PortStatus ret)
def subscribeFromRef(self, properties)
Getting object reference fromn Any directry.
def publishInterfaceProfile(self, properties)
Publish InterfaceProfile information.
def subscribeInterface(self, properties)
Subscribe to the data sending notification.
def unsubscribeFromRef(self, properties)
ubsubscribing (Object reference version)
def setObject(self, obj)
Set CORBA Object.
def find_index(nv, name)
Definition: NVUtil.py:229
def put(self, data)
Send data to the destination port.
int PORT_OK
DataPortStatus return codes.
def __del__(self, CorbaConsumer=OpenRTM_aist.CorbaConsumer)
Destructor.
def unsubscribeFromIor(self, properties)
ubsubscribing (IOR version)
def subscribeFromIor(self, properties)
Getting object reference fromn IOR string.
def _ptr(self)
Get Object reference narrowed as ObjectType.
def unsubscribeInterface(self, properties)
Unsubscribe the data send notification.


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:06