20 sys.path.insert(1,
"../")
21 sys.path.insert(1,
"../RTM_IDL")
25 from RingBuffer
import *
34 OpenRTM_aist.Manager.instance().shutdownManager()
39 prop.setProperty(
"length",
"5")
40 prop.setProperty(
"write.full_policy",
"overwrite")
43 prop.setProperty(
"write.timeout",
"5.0")
44 prop.setProperty(
"read.full_policy",
"overwrite")
47 prop.setProperty(
"read.timeout",
"5.0")
49 self.assertEqual(self.
_rb.length(),5)
53 self.assertEqual(self.
_rb.length(), 8)
54 self.assertEqual(self.
_rb.length(7), OpenRTM_aist.BufferStatus.BUFFER_OK)
55 self.assertEqual(self.
_rb.length(), 7)
56 self.assertEqual(self.
_rb.length(0), OpenRTM_aist.BufferStatus.NOT_SUPPORTED)
57 self.assertEqual(self.
_rb.length(-1), OpenRTM_aist.BufferStatus.NOT_SUPPORTED)
58 self.assertEqual(self.
_rb.length(1), OpenRTM_aist.BufferStatus.BUFFER_OK)
59 self.assertEqual(self.
_rb.length(), 1)
65 prop.setProperty(
"length",
"5")
66 prop.setProperty(
"write.full_policy",
"overwrite")
69 prop.setProperty(
"write.timeout",
"5.0")
70 prop.setProperty(
"read.full_policy",
"overwrite")
73 prop.setProperty(
"read.timeout",
"5.0")
75 self.assertEqual(self.
_rb.write(123), OpenRTM_aist.BufferStatus.BUFFER_OK)
77 self.assertEqual(self.
_rb.read(value),OpenRTM_aist.BufferStatus.BUFFER_OK)
78 self.assertEqual(value[0],123)
80 self.assertEqual(self.
_rb.read(value),OpenRTM_aist.BufferStatus.BUFFER_EMPTY)
81 self.assertEqual(value[0],123)
87 self.assertEqual(self.
_rb.write(1),OpenRTM_aist.BufferStatus.BUFFER_OK)
89 self.assertEqual(data[0],1)
91 self.assertEqual(self.
_rb.write(2),OpenRTM_aist.BufferStatus.BUFFER_OK)
93 self.assertEqual(data[0],2)
95 self.assertEqual(self.
_rb.write(3),OpenRTM_aist.BufferStatus.BUFFER_OK)
97 self.assertEqual(data[0],3)
99 self.assertEqual(self.
_rb.write(4),OpenRTM_aist.BufferStatus.BUFFER_OK)
101 self.assertEqual(data[0],4)
103 self.assertEqual(self.
_rb.write(5),OpenRTM_aist.BufferStatus.BUFFER_OK)
105 self.assertEqual(data[0],5)
107 self.assertEqual(self.
_rb.write(6),OpenRTM_aist.BufferStatus.BUFFER_OK)
109 self.assertEqual(data[0],6)
111 self.assertEqual(self.
_rb.write(
"string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
113 self.assertEqual(data[0],
"string")
115 self.assertEqual(self.
_rb.write([1,2,3]),OpenRTM_aist.BufferStatus.BUFFER_OK)
117 self.assertEqual(data[0],[1,2,3])
119 self.assertEqual(self.
_rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
121 self.assertEqual(data[0],0.12345)
124 self.assertEqual(self.
_rb.write(0.12345,1,0),OpenRTM_aist.BufferStatus.BUFFER_OK)
125 self.assertEqual(self.
_rb.write(0.12345,1,0),OpenRTM_aist.BufferStatus.TIMEOUT)
129 self.assertEqual(self.
_rb.read(data,1,0),OpenRTM_aist.BufferStatus.TIMEOUT)
130 self.assertEqual(self.
_rb.write(
"string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
134 self.assertEqual(self.
_rb.read(data),OpenRTM_aist.BufferStatus.BUFFER_OK)
135 self.assertEqual(data[0],
"string")
136 self.assertEqual(self.
_rb.read(data,1,0),OpenRTM_aist.BufferStatus.TIMEOUT)
140 self.assertEqual(self.
_rb.readable(),0)
141 self.assertEqual(self.
_rb.write(
"string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
142 self.assertEqual(self.
_rb.readable(),1)
143 self.assertEqual(self.
_rb.read(data),OpenRTM_aist.BufferStatus.BUFFER_OK)
144 self.assertEqual(self.
_rb.readable(),0)
146 self.assertEqual(self.
_rb.readable(),0)
150 self.assertEqual(self.
_rb.empty(),
True)
151 self.assertEqual(self.
_rb.write(
"string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
152 self.assertEqual(self.
_rb.empty(),
False)
153 self.assertEqual(self.
_rb.read(data),OpenRTM_aist.BufferStatus.BUFFER_OK)
154 self.assertEqual(self.
_rb.empty(),
True)
156 self.assertEqual(self.
_rb.empty(),
True)
159 self.assertEqual(self.
_rb.isFull(),
False)
163 self.assertEqual(self.
_rb.isEmpty(),
True)
165 self.assertEqual(self.
_rb.isEmpty(),
False)
169 self.assertEqual(self.
_rb.isNew(),
False)
171 self.assertEqual(self.
_rb.isNew(),
True)
174 self.assertEqual(self.
_rb.isNew(),
False)
176 self.assertEqual(self.
_rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
177 self.assertEqual(self.
_rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
178 self.assertEqual(self.
_rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
179 self.assertEqual(self.
_rb.isNew(),
True)
180 self.assertEqual(self.
_rb.isNew(),
True)
181 self.assertEqual(self.
_rb.isNew(),
True)
185 if __name__ ==
'__main__':
def COMMENTtest_isFull(self)
The Properties class represents a persistent set of properties.
def COMMENTtest_isNew(self)
def COMMENTtest_isEmpty(self)