19 from omniORB
import any
22 sys.path.insert(1,
"../")
26 from InPortCorbaCdrProvider
import *
40 return OpenRTM_aist.BufferStatus.BUFFER_OK
46 value.append(self.
_data)
47 return OpenRTM_aist.BufferStatus.BUFFER_OK
59 self._buffer.write(data)
60 return OpenRTM_aist.BufferStatus.BUFFER_OK
67 OpenRTM_aist.CdrRingBufferInit()
68 self.
_prov = OpenRTM_aist.InPortProviderFactory.instance().createObject(
"corba_cdr")
70 self.
_orb = OpenRTM_aist.Manager.instance().getORB()
79 self._prov.setBuffer(self.
_buffer)
84 self._prov._connector = self.
_con 85 self._prov.setBuffer(self.
_buffer)
86 data = RTC.TimedLong(RTC.Time(0,0),123)
87 cdr = cdrMarshal(any.to_any(data).typecode(), data, 1)
88 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
89 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
90 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
91 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
92 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
93 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
94 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
95 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
96 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
97 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
98 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
100 self.assertEqual(self._buffer.read(val), OpenRTM_aist.BufferStatus.BUFFER_OK)
101 get_data = cdrUnmarshal(any.to_any(data).typecode(), val[0], 1)
102 self.assertEqual(get_data.data, 123)
108 if __name__ ==
'__main__':
The Properties class represents a persistent set of properties.
def InPortCorbaCdrProviderInit()
def __init__(self, buffer)