19 from omniORB
import any
22 sys.path.insert(1,
"../")
26 from InPortCorbaCdrProvider
import *
40 return OpenRTM_aist.BufferStatus.BUFFER_OK
46 value.append(self.
_data)
47 return OpenRTM_aist.BufferStatus.BUFFER_OK
60 return OpenRTM_aist.BufferStatus.BUFFER_OK
67 OpenRTM_aist.CdrRingBufferInit()
68 self.
_prov = OpenRTM_aist.InPortProviderFactory.instance().createObject(
"corba_cdr")
70 self.
_orb = OpenRTM_aist.Manager.instance().getORB()
86 data = RTC.TimedLong(RTC.Time(0,0),123)
87 cdr = cdrMarshal(any.to_any(data).typecode(), data, 1)
88 self.assertEqual(self.
_prov.put(cdr),OpenRTM.PORT_OK)
89 self.assertEqual(self.
_prov.put(cdr),OpenRTM.PORT_OK)
90 self.assertEqual(self.
_prov.put(cdr),OpenRTM.PORT_OK)
91 self.assertEqual(self.
_prov.put(cdr),OpenRTM.PORT_OK)
92 self.assertEqual(self.
_prov.put(cdr),OpenRTM.PORT_OK)
93 self.assertEqual(self.
_prov.put(cdr),OpenRTM.PORT_OK)
94 self.assertEqual(self.
_prov.put(cdr),OpenRTM.PORT_OK)
95 self.assertEqual(self.
_prov.put(cdr),OpenRTM.PORT_OK)
96 self.assertEqual(self.
_prov.put(cdr),OpenRTM.PORT_OK)
97 self.assertEqual(self.
_prov.put(cdr),OpenRTM.PORT_OK)
98 self.assertEqual(self.
_prov.put(cdr),OpenRTM.PORT_OK)
100 self.assertEqual(self.
_buffer.read(val), OpenRTM_aist.BufferStatus.BUFFER_OK)
101 get_data = cdrUnmarshal(any.to_any(data).typecode(), val[0], 1)
102 self.assertEqual(get_data.data, 123)
108 if __name__ ==
'__main__':
The Properties class represents a persistent set of properties.
def InPortCorbaCdrProviderInit()
def __init__(self, buffer)