00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 import sys
00020 sys.path.insert(1,"../")
00021 sys.path.insert(1,"../RTM_IDL")
00022
00023 import unittest
00024
00025 from RingBuffer import *
00026 import OpenRTM_aist
00027
00028 class TestRingBuffer(unittest.TestCase):
00029
00030 def setUp(self):
00031 self._rb = RingBuffer()
00032
00033 def tearDown(self):
00034 OpenRTM_aist.Manager.instance().shutdownManager()
00035 return
00036
00037 def test_init(self):
00038 prop = OpenRTM_aist.Properties()
00039 prop.setProperty("length","5")
00040 prop.setProperty("write.full_policy","overwrite")
00041
00042
00043 prop.setProperty("write.timeout","5.0")
00044 prop.setProperty("read.full_policy","overwrite")
00045
00046
00047 prop.setProperty("read.timeout","5.0")
00048 self._rb.init(prop)
00049 self.assertEqual(self._rb.length(),5)
00050 return
00051
00052 def test_length(self):
00053 self.assertEqual(self._rb.length(), 8)
00054 self.assertEqual(self._rb.length(7), OpenRTM_aist.BufferStatus.BUFFER_OK)
00055 self.assertEqual(self._rb.length(), 7)
00056 self.assertEqual(self._rb.length(0), OpenRTM_aist.BufferStatus.NOT_SUPPORTED)
00057 self.assertEqual(self._rb.length(-1), OpenRTM_aist.BufferStatus.NOT_SUPPORTED)
00058 self.assertEqual(self._rb.length(1), OpenRTM_aist.BufferStatus.BUFFER_OK)
00059 self.assertEqual(self._rb.length(), 1)
00060 return
00061
00062
00063 def test_reset(self):
00064 prop = OpenRTM_aist.Properties()
00065 prop.setProperty("length","5")
00066 prop.setProperty("write.full_policy","overwrite")
00067
00068
00069 prop.setProperty("write.timeout","5.0")
00070 prop.setProperty("read.full_policy","overwrite")
00071
00072
00073 prop.setProperty("read.timeout","5.0")
00074 self._rb.init(prop)
00075 self.assertEqual(self._rb.write(123), OpenRTM_aist.BufferStatus.BUFFER_OK)
00076 value = [None]
00077 self.assertEqual(self._rb.read(value),OpenRTM_aist.BufferStatus.BUFFER_OK)
00078 self.assertEqual(value[0],123)
00079 self._rb.reset()
00080 self.assertEqual(self._rb.read(value),OpenRTM_aist.BufferStatus.BUFFER_EMPTY)
00081 self.assertEqual(value[0],123)
00082
00083 return
00084
00085 def test_write(self):
00086 data=[0]
00087 self.assertEqual(self._rb.write(1),OpenRTM_aist.BufferStatus.BUFFER_OK)
00088 self._rb.read(data)
00089 self.assertEqual(data[0],1)
00090
00091 self.assertEqual(self._rb.write(2),OpenRTM_aist.BufferStatus.BUFFER_OK)
00092 self._rb.read(data)
00093 self.assertEqual(data[0],2)
00094
00095 self.assertEqual(self._rb.write(3),OpenRTM_aist.BufferStatus.BUFFER_OK)
00096 self._rb.read(data)
00097 self.assertEqual(data[0],3)
00098
00099 self.assertEqual(self._rb.write(4),OpenRTM_aist.BufferStatus.BUFFER_OK)
00100 self._rb.read(data)
00101 self.assertEqual(data[0],4)
00102
00103 self.assertEqual(self._rb.write(5),OpenRTM_aist.BufferStatus.BUFFER_OK)
00104 self._rb.read(data)
00105 self.assertEqual(data[0],5)
00106
00107 self.assertEqual(self._rb.write(6),OpenRTM_aist.BufferStatus.BUFFER_OK)
00108 self._rb.read(data)
00109 self.assertEqual(data[0],6)
00110
00111 self.assertEqual(self._rb.write("string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
00112 self._rb.read(data)
00113 self.assertEqual(data[0],"string")
00114
00115 self.assertEqual(self._rb.write([1,2,3]),OpenRTM_aist.BufferStatus.BUFFER_OK)
00116 self._rb.read(data)
00117 self.assertEqual(data[0],[1,2,3])
00118
00119 self.assertEqual(self._rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
00120 self._rb.read(data)
00121 self.assertEqual(data[0],0.12345)
00122
00123 for i in range(8):
00124 self.assertEqual(self._rb.write(0.12345,1,0),OpenRTM_aist.BufferStatus.BUFFER_OK)
00125 self.assertEqual(self._rb.write(0.12345,1,0),OpenRTM_aist.BufferStatus.TIMEOUT)
00126
00127 def test_read(self):
00128 data=[0]
00129 self.assertEqual(self._rb.read(data,1,0),OpenRTM_aist.BufferStatus.TIMEOUT)
00130 self.assertEqual(self._rb.write("string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
00131
00132
00133
00134 self.assertEqual(self._rb.read(data),OpenRTM_aist.BufferStatus.BUFFER_OK)
00135 self.assertEqual(data[0],"string")
00136 self.assertEqual(self._rb.read(data,1,0),OpenRTM_aist.BufferStatus.TIMEOUT)
00137
00138 def test_readable(self):
00139 data=[0]
00140 self.assertEqual(self._rb.readable(),0)
00141 self.assertEqual(self._rb.write("string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
00142 self.assertEqual(self._rb.readable(),1)
00143 self.assertEqual(self._rb.read(data),OpenRTM_aist.BufferStatus.BUFFER_OK)
00144 self.assertEqual(self._rb.readable(),0)
00145 self._rb.read(data)
00146 self.assertEqual(self._rb.readable(),0)
00147
00148 def test_empty(self):
00149 data=[0]
00150 self.assertEqual(self._rb.empty(),True)
00151 self.assertEqual(self._rb.write("string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
00152 self.assertEqual(self._rb.empty(),False)
00153 self.assertEqual(self._rb.read(data),OpenRTM_aist.BufferStatus.BUFFER_OK)
00154 self.assertEqual(self._rb.empty(),True)
00155 self._rb.read(data)
00156 self.assertEqual(self._rb.empty(),True)
00157
00158 def COMMENTtest_isFull(self):
00159 self.assertEqual(self._rb.isFull(),False)
00160
00161
00162 def COMMENTtest_isEmpty(self):
00163 self.assertEqual(self._rb.isEmpty(),True)
00164 self._rb.init(0)
00165 self.assertEqual(self._rb.isEmpty(),False)
00166
00167
00168 def COMMENTtest_isNew(self):
00169 self.assertEqual(self._rb.isNew(),False)
00170 self._rb.init(0)
00171 self.assertEqual(self._rb.isNew(),True)
00172 data=[0]
00173 self._rb.read(data)
00174 self.assertEqual(self._rb.isNew(),False)
00175
00176 self.assertEqual(self._rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
00177 self.assertEqual(self._rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
00178 self.assertEqual(self._rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
00179 self.assertEqual(self._rb.isNew(),True)
00180 self.assertEqual(self._rb.isNew(),True)
00181 self.assertEqual(self._rb.isNew(),True)
00182
00183
00184
00185 if __name__ == '__main__':
00186 unittest.main()