19 sys.path.insert(1,
"../")
34 def echo(self, value=None):
42 def echo(self, value=None):
43 print "OnRWConvert Called" 58 return OpenRTM_aist.BufferStatus.BUFFER_OK
69 value.append(self.
_data)
70 return OpenRTM_aist.BufferStatus.BUFFER_OK
82 self._buffer.write(data)
83 return OpenRTM_aist.BufferStatus.BUFFER_OK
86 self._buffer.read(data)
87 return OpenRTM_aist.DataPortStatus.PORT_OK
90 ret = self._buffer.readable()
96 OpenRTM_aist.Manager.instance()
103 OpenRTM_aist.Manager.instance().shutdownManager()
107 self.assertEqual(self._ipn.name(),
"in")
110 self.assertEqual(self._ipn.isEmpty(),
True)
111 self.assertEqual(self._ipn.isNew(),
False)
112 self._connector.write(RTC.TimedLong(RTC.Time(0,0), 123))
113 self.assertEqual(self._ipn.isEmpty(),
False)
114 self.assertEqual(self._ipn.isNew(),
True)
115 read_data = self._ipn.read()
116 self.assertEqual(self._ipn.isEmpty(),
True)
117 self.assertEqual(self._ipn.isNew(),
False)
118 self.assertEqual(read_data.data, 123)
123 self._connector.write(RTC.TimedLong(RTC.Time(0,0), 456))
124 self._ipn.setOnRead(
OnRWTest().echo)
126 read_data = self._ipn.read()
127 self.assertEqual(read_data.data, 456)
132 if __name__ ==
'__main__':
def __init__(self, buffer)
def echo(self, value=None)
def echo(self, value=None)