InPortCorbaCdrConsumer.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 
18 
19 import sys
20 from omniORB import any
21 from omniORB import CORBA
22 import OpenRTM_aist
23 import OpenRTM
24 
25 
53  """
54  """
55 
56 
73  def __init__(self):
74  OpenRTM_aist.CorbaConsumer.__init__(self)
75  self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("InPortCorbaCdrConsumer")
76  self._properties = None
77  return
78 
79 
92  def __del__(self, CorbaConsumer=OpenRTM_aist.CorbaConsumer):
93  self._rtcout.RTC_PARANOID("~InPortCorbaCdrConsumer()")
94  CorbaConsumer.__del__(self)
95  return
96 
97 
112  def init(self, prop):
113  self._rtcout.RTC_TRACE("init()")
114  self._properties = prop
115  return
116 
117 
150  def put(self, data):
151  self._rtcout.RTC_PARANOID("put()")
152 
153  try:
154  ref_ = self.getObject()
155  if ref_:
156  inportcdr = ref_._narrow(OpenRTM.InPortCdr)
157  return self.convertReturnCode(inportcdr.put(data))
158  return self.CONNECTION_LOST
159  except:
160  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
161  return self.CONNECTION_LOST
162 
163  return self.UNKNOWN_ERROR
164 
165 
189  def publishInterfaceProfile(self, properties):
190  return
191 
192 
215  def subscribeInterface(self, properties):
216  self._rtcout.RTC_TRACE("subscribeInterface()")
217  # self._rtcout.RTC_DEBUG_STR(OpenRTM_aist.NVUtil.toString(properties))
218 
219  # getting InPort's ref from IOR string
220  if self.subscribeFromIor(properties):
221  return True
222 
223  # getting InPort's ref from Object reference
224  if self.subscribeFromRef(properties):
225  return True
226 
227  return False
228 
229 
247  def unsubscribeInterface(self, properties):
248  self._rtcout.RTC_TRACE("unsubscribeInterface()")
249  # self._rtcout.RTC_DEBUG_STR(OpenRTM_aist.NVUtil.toString(properties))
250 
251  if self.unsubscribeFromIor(properties):
252  return
253 
254  self.unsubscribeFromRef(properties)
255  return
256 
257 
271  def subscribeFromIor(self, properties):
272  self._rtcout.RTC_TRACE("subscribeFromIor()")
273 
274  index = OpenRTM_aist.NVUtil.find_index(properties,
275  "dataport.corba_cdr.inport_ior")
276  if index < 0:
277  self._rtcout.RTC_ERROR("inport_ior not found")
278  return False
279 
280  ior = ""
281  try:
282  ior = any.from_any(properties[index].value, keep_structs=True)
283  except:
284  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
285 
286  if not ior:
287  self._rtcout.RTC_ERROR("inport_ior has no string")
288  return False
289 
290  orb = OpenRTM_aist.Manager.instance().getORB()
291  obj = orb.string_to_object(ior)
292 
293  if CORBA.is_nil(obj):
294  self._rtcout.RTC_ERROR("invalid IOR string has been passed")
295  return False
296 
297  if not self.setObject(obj):
298  self._rtcout.RTC_WARN("Setting object to consumer failed.")
299  return False
300 
301  return True
302 
303 
317  def subscribeFromRef(self, properties):
318  self._rtcout.RTC_TRACE("subscribeFromRef()")
319  index = OpenRTM_aist.NVUtil.find_index(properties,
320  "dataport.corba_cdr.inport_ref")
321  if index < 0:
322  self._rtcout.RTC_ERROR("inport_ref not found")
323  return False
324 
325  obj = None
326  try:
327  obj = any.from_any(properties[index].value, keep_structs=True)
328  except:
329  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
330 
331  if not obj:
332  self._rtcout.RTC_ERROR("prop[inport_ref] is not objref")
333  return False
334 
335  if CORBA.is_nil(obj):
336  self._rtcout.RTC_ERROR("prop[inport_ref] is not objref")
337  return False
338 
339  if not self.setObject(obj):
340  self._rtcout.RTC_ERROR("Setting object to consumer failed.")
341  return False
342 
343  return True
344 
345 
359  def unsubscribeFromIor(self, properties):
360  self._rtcout.RTC_TRACE("unsubscribeFromIor()")
361  index = OpenRTM_aist.NVUtil.find_index(properties,
362  "dataport.corba_cdr.inport_ior")
363  if index < 0:
364  self._rtcout.RTC_ERROR("inport_ior not found")
365  return False
366 
367  ior = ""
368  try:
369  ior = any.from_any(properties[index].value, keep_structs=True)
370  except:
371  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
372 
373  if not ior:
374  self._rtcout.RTC_ERROR("prop[inport_ior] is not string")
375  return False
376 
377  orb = OpenRTM_aist.Manager.instance().getORB()
378  var = orb.string_to_object(ior)
379  if not self._ptr()._is_equivalent(var):
380  self._rtcout.RTC_ERROR("connector property inconsistency")
381  return False
382 
383  self.releaseObject()
384  return True
385 
386 
400  def unsubscribeFromRef(self, properties):
401  self._rtcout.RTC_TRACE("unsubscribeFromRef()")
402  index = OpenRTM_aist.NVUtil.find_index(properties,
403  "dataport.corba_cdr.inport_ref")
404 
405  if index < 0:
406  return False
407 
408  obj = None
409  try:
410  obj = any.from_any(properties[index].value, keep_structs=True)
411  except:
412  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
413 
414  if not obj:
415  return False
416 
417  if not self._ptr()._is_equivalent(obj):
418  return False
419 
420  self.releaseObject()
421  return True
422 
423 
431  def convertReturnCode(self, ret):
432  if ret == OpenRTM.PORT_OK:
433  return self.PORT_OK
434 
435  elif ret == OpenRTM.PORT_ERROR:
436  return self.PORT_ERROR
437 
438  elif ret == OpenRTM.BUFFER_FULL:
439  return self.SEND_FULL
440 
441  elif ret == OpenRTM.BUFFER_TIMEOUT:
442  return self.SEND_TIMEOUT
443 
444  elif ret == OpenRTM.UNKNOWN_ERROR:
445  return self.UNKNOWN_ERROR
446 
447  else:
448  return self.UNKNOWN_ERROR
449 
450 
452  factory = OpenRTM_aist.InPortConsumerFactory.instance()
453  factory.addFactory("corba_cdr",
455  OpenRTM_aist.Delete)
OpenRTM_aist.CorbaConsumer.CorbaConsumer._ptr
def _ptr(self)
Get Object reference narrowed as ObjectType.
Definition: CorbaConsumer.py:314
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer.publishInterfaceProfile
def publishInterfaceProfile(self, properties)
Publish InterfaceProfile information.
Definition: InPortCorbaCdrConsumer.py:189
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer.subscribeFromRef
def subscribeFromRef(self, properties)
Getting object reference fromn Any directry.
Definition: InPortCorbaCdrConsumer.py:317
OpenRTM_aist.DataPortStatus.DataPortStatus.UNKNOWN_ERROR
int UNKNOWN_ERROR
Definition: DataPortStatus.py:153
OpenRTM_aist.DataPortStatus.DataPortStatus.CONNECTION_LOST
int CONNECTION_LOST
Definition: DataPortStatus.py:152
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer.unsubscribeInterface
def unsubscribeInterface(self, properties)
Unsubscribe the data send notification.
Definition: InPortCorbaCdrConsumer.py:247
OpenRTM_aist.CorbaConsumer.CorbaConsumerBase.setObject
def setObject(self, obj)
Set CORBA Object.
Definition: CorbaConsumer.py:113
OpenRTM_aist.DataPortStatus.DataPortStatus.PORT_OK
int PORT_OK
DataPortStatus return codes.
Definition: DataPortStatus.py:140
OpenRTM_aist.InPortConsumer.InPortConsumer
InPortConsumer class.
Definition: InPortConsumer.py:42
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer.init
def init(self, prop)
Initializing configuration.
Definition: InPortCorbaCdrConsumer.py:112
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer._rtcout
_rtcout
Definition: InPortCorbaCdrConsumer.py:75
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer.__init__
def __init__(self)
Constructor.
Definition: InPortCorbaCdrConsumer.py:73
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer.put
def put(self, data)
Send data to the destination port.
Definition: InPortCorbaCdrConsumer.py:150
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer
InPortCorbaCdrConsumer class.
Definition: InPortCorbaCdrConsumer.py:52
OpenRTM_aist.NVUtil.find_index
def find_index(nv, name)
Definition: NVUtil.py:229
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer.subscribeInterface
def subscribeInterface(self, properties)
Subscribe to the data sending notification.
Definition: InPortCorbaCdrConsumer.py:215
OpenRTM_aist.CorbaConsumer.CorbaConsumer
Definition: CorbaConsumer.py:187
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumerInit
def InPortCorbaCdrConsumerInit()
Definition: InPortCorbaCdrConsumer.py:451
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer.unsubscribeFromRef
def unsubscribeFromRef(self, properties)
ubsubscribing (Object reference version)
Definition: InPortCorbaCdrConsumer.py:400
OpenRTM_aist.DataPortStatus.DataPortStatus.PORT_ERROR
int PORT_ERROR
Definition: DataPortStatus.py:141
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer.unsubscribeFromIor
def unsubscribeFromIor(self, properties)
ubsubscribing (IOR version)
Definition: InPortCorbaCdrConsumer.py:359
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer._properties
_properties
Definition: InPortCorbaCdrConsumer.py:76
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer.__del__
def __del__(self, CorbaConsumer=OpenRTM_aist.CorbaConsumer)
Destructor.
Definition: InPortCorbaCdrConsumer.py:92
OpenRTM_aist.CorbaConsumer.CorbaConsumerBase.releaseObject
def releaseObject(self)
Definition: CorbaConsumer.py:159
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer.convertReturnCode
def convertReturnCode(self, ret)
Return codes conversionReturnCode convertReturnCode(OpenRTM::PortStatus ret)
Definition: InPortCorbaCdrConsumer.py:431
OpenRTM_aist.DataPortStatus.DataPortStatus.SEND_FULL
int SEND_FULL
Definition: DataPortStatus.py:146
OpenRTM_aist.CorbaConsumer.CorbaConsumerBase.getObject
def getObject(self)
Get CORBA Object.
Definition: CorbaConsumer.py:142
OpenRTM_aist.InPortCorbaCdrConsumer.InPortCorbaCdrConsumer.subscribeFromIor
def subscribeFromIor(self, properties)
Getting object reference fromn IOR string.
Definition: InPortCorbaCdrConsumer.py:271
OpenRTM_aist.DataPortStatus.DataPortStatus.SEND_TIMEOUT
int SEND_TIMEOUT
Definition: DataPortStatus.py:147


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Apr 21 2025 02:45:06