test_RingBuffer.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 # \file test_RingBuffer.py
5 # \brief test for Defautl Buffer class
6 # \date $Date: 2007/09/12 $
7 # \author Shinji Kurihara
8 #
9 # Copyright (C) 2006
10 # Noriaki Ando
11 # Task-intelligence Research Group,
12 # Intelligent Systems Research Institute,
13 # National Institute of
14 # Advanced Industrial Science and Technology (AIST), Japan
15 # All rights reserved.
16 #
17 
18 
19 import sys
20 sys.path.insert(1,"../")
21 sys.path.insert(1,"../RTM_IDL")
22 
23 import unittest
24 
25 from RingBuffer import *
26 import OpenRTM_aist
27 
28 class TestRingBuffer(unittest.TestCase):
29 
30  def setUp(self):
31  self._rb = RingBuffer()
32 
33  def tearDown(self):
34  OpenRTM_aist.Manager.instance().shutdownManager()
35  return
36 
37  def test_init(self):
39  prop.setProperty("length","5")
40  prop.setProperty("write.full_policy","overwrite")
41  #prop.setProperty("write.full_policy","do_nothing")
42  #prop.setProperty("write.full_policy","block")
43  prop.setProperty("write.timeout","5.0")
44  prop.setProperty("read.full_policy","overwrite")
45  #prop.setProperty("read.full_policy","do_nothing")
46  #prop.setProperty("read.full_policy","block")
47  prop.setProperty("read.timeout","5.0")
48  self._rb.init(prop)
49  self.assertEqual(self._rb.length(),5)
50  return
51 
52  def test_length(self):
53  self.assertEqual(self._rb.length(), 8)
54  self.assertEqual(self._rb.length(7), OpenRTM_aist.BufferStatus.BUFFER_OK)
55  self.assertEqual(self._rb.length(), 7)
56  self.assertEqual(self._rb.length(0), OpenRTM_aist.BufferStatus.NOT_SUPPORTED)
57  self.assertEqual(self._rb.length(-1), OpenRTM_aist.BufferStatus.NOT_SUPPORTED)
58  self.assertEqual(self._rb.length(1), OpenRTM_aist.BufferStatus.BUFFER_OK)
59  self.assertEqual(self._rb.length(), 1)
60  return
61 
62 
63  def test_reset(self):
65  prop.setProperty("length","5")
66  prop.setProperty("write.full_policy","overwrite")
67  #prop.setProperty("write.full_policy","do_nothing")
68  #prop.setProperty("write.full_policy","block")
69  prop.setProperty("write.timeout","5.0")
70  prop.setProperty("read.full_policy","overwrite")
71  #prop.setProperty("read.full_policy","do_nothing")
72  #prop.setProperty("read.full_policy","block")
73  prop.setProperty("read.timeout","5.0")
74  self._rb.init(prop)
75  self.assertEqual(self._rb.write(123), OpenRTM_aist.BufferStatus.BUFFER_OK)
76  value = [None]
77  self.assertEqual(self._rb.read(value),OpenRTM_aist.BufferStatus.BUFFER_OK)
78  self.assertEqual(value[0],123)
79  self._rb.reset()
80  self.assertEqual(self._rb.read(value),OpenRTM_aist.BufferStatus.BUFFER_EMPTY)
81  self.assertEqual(value[0],123)
82 
83  return
84 
85  def test_write(self):
86  data=[0]
87  self.assertEqual(self._rb.write(1),OpenRTM_aist.BufferStatus.BUFFER_OK)
88  self._rb.read(data)
89  self.assertEqual(data[0],1)
90 
91  self.assertEqual(self._rb.write(2),OpenRTM_aist.BufferStatus.BUFFER_OK)
92  self._rb.read(data)
93  self.assertEqual(data[0],2)
94 
95  self.assertEqual(self._rb.write(3),OpenRTM_aist.BufferStatus.BUFFER_OK)
96  self._rb.read(data)
97  self.assertEqual(data[0],3)
98 
99  self.assertEqual(self._rb.write(4),OpenRTM_aist.BufferStatus.BUFFER_OK)
100  self._rb.read(data)
101  self.assertEqual(data[0],4)
102 
103  self.assertEqual(self._rb.write(5),OpenRTM_aist.BufferStatus.BUFFER_OK)
104  self._rb.read(data)
105  self.assertEqual(data[0],5)
106 
107  self.assertEqual(self._rb.write(6),OpenRTM_aist.BufferStatus.BUFFER_OK)
108  self._rb.read(data)
109  self.assertEqual(data[0],6)
110 
111  self.assertEqual(self._rb.write("string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
112  self._rb.read(data)
113  self.assertEqual(data[0],"string")
114 
115  self.assertEqual(self._rb.write([1,2,3]),OpenRTM_aist.BufferStatus.BUFFER_OK)
116  self._rb.read(data)
117  self.assertEqual(data[0],[1,2,3])
118 
119  self.assertEqual(self._rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
120  self._rb.read(data)
121  self.assertEqual(data[0],0.12345)
122 
123  for i in range(8):
124  self.assertEqual(self._rb.write(0.12345,1,0),OpenRTM_aist.BufferStatus.BUFFER_OK)
125  self.assertEqual(self._rb.write(0.12345,1,0),OpenRTM_aist.BufferStatus.TIMEOUT)
126 
127  def test_read(self):
128  data=[0]
129  self.assertEqual(self._rb.read(data,1,0),OpenRTM_aist.BufferStatus.TIMEOUT)
130  self.assertEqual(self._rb.write("string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
131  # Failure pattern (parameter must be List object.)
132  # data=0
133  # self._rb.read(data)
134  self.assertEqual(self._rb.read(data),OpenRTM_aist.BufferStatus.BUFFER_OK)
135  self.assertEqual(data[0],"string")
136  self.assertEqual(self._rb.read(data,1,0),OpenRTM_aist.BufferStatus.TIMEOUT)
137 
138  def test_readable(self):
139  data=[0]
140  self.assertEqual(self._rb.readable(),0)
141  self.assertEqual(self._rb.write("string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
142  self.assertEqual(self._rb.readable(),1)
143  self.assertEqual(self._rb.read(data),OpenRTM_aist.BufferStatus.BUFFER_OK)
144  self.assertEqual(self._rb.readable(),0)
145  self._rb.read(data)
146  self.assertEqual(self._rb.readable(),0)
147 
148  def test_empty(self):
149  data=[0]
150  self.assertEqual(self._rb.empty(),True)
151  self.assertEqual(self._rb.write("string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
152  self.assertEqual(self._rb.empty(),False)
153  self.assertEqual(self._rb.read(data),OpenRTM_aist.BufferStatus.BUFFER_OK)
154  self.assertEqual(self._rb.empty(),True)
155  self._rb.read(data)
156  self.assertEqual(self._rb.empty(),True)
157 
159  self.assertEqual(self._rb.isFull(),False)
160 
161 
163  self.assertEqual(self._rb.isEmpty(),True)
164  self._rb.init(0)
165  self.assertEqual(self._rb.isEmpty(),False)
166 
167 
168  def COMMENTtest_isNew(self):
169  self.assertEqual(self._rb.isNew(),False)
170  self._rb.init(0)
171  self.assertEqual(self._rb.isNew(),True)
172  data=[0]
173  self._rb.read(data)
174  self.assertEqual(self._rb.isNew(),False)
175 
176  self.assertEqual(self._rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
177  self.assertEqual(self._rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
178  self.assertEqual(self._rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
179  self.assertEqual(self._rb.isNew(),True)
180  self.assertEqual(self._rb.isNew(),True)
181  self.assertEqual(self._rb.isNew(),True)
182 
183 
184 
185 if __name__ == '__main__':
186  unittest.main()
The Properties class represents a persistent set of properties.
Definition: Properties.py:83


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:07