test_RingBuffer.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 
00004 # \file test_RingBuffer.py
00005 # \brief test for Defautl Buffer class
00006 # \date $Date: 2007/09/12 $
00007 # \author Shinji Kurihara
00008 #
00009 # Copyright (C) 2006
00010 #     Noriaki Ando
00011 #     Task-intelligence Research Group,
00012 #     Intelligent Systems Research Institute,
00013 #     National Institute of
00014 #         Advanced Industrial Science and Technology (AIST), Japan
00015 #     All rights reserved.
00016 #
00017 
00018 
00019 import sys
00020 sys.path.insert(1,"../")
00021 sys.path.insert(1,"../RTM_IDL")
00022 
00023 import unittest
00024 
00025 from RingBuffer import *
00026 import OpenRTM_aist
00027 
00028 class TestRingBuffer(unittest.TestCase):
00029 
00030   def setUp(self):
00031     self._rb = RingBuffer()
00032 
00033   def tearDown(self):
00034     OpenRTM_aist.Manager.instance().shutdownManager()
00035     return
00036 
00037   def test_init(self):
00038     prop = OpenRTM_aist.Properties()
00039     prop.setProperty("length","5")
00040     prop.setProperty("write.full_policy","overwrite")
00041     #prop.setProperty("write.full_policy","do_nothing")
00042     #prop.setProperty("write.full_policy","block")
00043     prop.setProperty("write.timeout","5.0")
00044     prop.setProperty("read.full_policy","overwrite")
00045     #prop.setProperty("read.full_policy","do_nothing")
00046     #prop.setProperty("read.full_policy","block")
00047     prop.setProperty("read.timeout","5.0")
00048     self._rb.init(prop)
00049     self.assertEqual(self._rb.length(),5)
00050     return
00051 
00052   def test_length(self):
00053     self.assertEqual(self._rb.length(), 8)
00054     self.assertEqual(self._rb.length(7), OpenRTM_aist.BufferStatus.BUFFER_OK)
00055     self.assertEqual(self._rb.length(), 7)
00056     self.assertEqual(self._rb.length(0), OpenRTM_aist.BufferStatus.NOT_SUPPORTED)
00057     self.assertEqual(self._rb.length(-1), OpenRTM_aist.BufferStatus.NOT_SUPPORTED)
00058     self.assertEqual(self._rb.length(1), OpenRTM_aist.BufferStatus.BUFFER_OK)
00059     self.assertEqual(self._rb.length(), 1)
00060     return
00061 
00062 
00063   def test_reset(self):
00064     prop = OpenRTM_aist.Properties()
00065     prop.setProperty("length","5")
00066     prop.setProperty("write.full_policy","overwrite")
00067     #prop.setProperty("write.full_policy","do_nothing")
00068     #prop.setProperty("write.full_policy","block")
00069     prop.setProperty("write.timeout","5.0")
00070     prop.setProperty("read.full_policy","overwrite")
00071     #prop.setProperty("read.full_policy","do_nothing")
00072     #prop.setProperty("read.full_policy","block")
00073     prop.setProperty("read.timeout","5.0")
00074     self._rb.init(prop)
00075     self.assertEqual(self._rb.write(123), OpenRTM_aist.BufferStatus.BUFFER_OK)
00076     value = [None]
00077     self.assertEqual(self._rb.read(value),OpenRTM_aist.BufferStatus.BUFFER_OK)
00078     self.assertEqual(value[0],123)
00079     self._rb.reset()
00080     self.assertEqual(self._rb.read(value),OpenRTM_aist.BufferStatus.BUFFER_EMPTY)
00081     self.assertEqual(value[0],123)
00082 
00083     return
00084 
00085   def test_write(self):
00086     data=[0]
00087     self.assertEqual(self._rb.write(1),OpenRTM_aist.BufferStatus.BUFFER_OK)
00088     self._rb.read(data)
00089     self.assertEqual(data[0],1)
00090 
00091     self.assertEqual(self._rb.write(2),OpenRTM_aist.BufferStatus.BUFFER_OK)
00092     self._rb.read(data)
00093     self.assertEqual(data[0],2)
00094 
00095     self.assertEqual(self._rb.write(3),OpenRTM_aist.BufferStatus.BUFFER_OK)
00096     self._rb.read(data)
00097     self.assertEqual(data[0],3)
00098 
00099     self.assertEqual(self._rb.write(4),OpenRTM_aist.BufferStatus.BUFFER_OK)
00100     self._rb.read(data)
00101     self.assertEqual(data[0],4)
00102 
00103     self.assertEqual(self._rb.write(5),OpenRTM_aist.BufferStatus.BUFFER_OK)
00104     self._rb.read(data)
00105     self.assertEqual(data[0],5)
00106 
00107     self.assertEqual(self._rb.write(6),OpenRTM_aist.BufferStatus.BUFFER_OK)
00108     self._rb.read(data)
00109     self.assertEqual(data[0],6)
00110 
00111     self.assertEqual(self._rb.write("string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
00112     self._rb.read(data)
00113     self.assertEqual(data[0],"string")
00114 
00115     self.assertEqual(self._rb.write([1,2,3]),OpenRTM_aist.BufferStatus.BUFFER_OK)
00116     self._rb.read(data)
00117     self.assertEqual(data[0],[1,2,3])
00118 
00119     self.assertEqual(self._rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
00120     self._rb.read(data)
00121     self.assertEqual(data[0],0.12345)
00122 
00123     for i in range(8):
00124       self.assertEqual(self._rb.write(0.12345,1,0),OpenRTM_aist.BufferStatus.BUFFER_OK)
00125     self.assertEqual(self._rb.write(0.12345,1,0),OpenRTM_aist.BufferStatus.TIMEOUT)
00126 
00127   def test_read(self):
00128     data=[0]
00129     self.assertEqual(self._rb.read(data,1,0),OpenRTM_aist.BufferStatus.TIMEOUT)
00130     self.assertEqual(self._rb.write("string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
00131     # Failure pattern (parameter must be List object.)
00132     # data=0
00133     # self._rb.read(data)
00134     self.assertEqual(self._rb.read(data),OpenRTM_aist.BufferStatus.BUFFER_OK)
00135     self.assertEqual(data[0],"string")
00136     self.assertEqual(self._rb.read(data,1,0),OpenRTM_aist.BufferStatus.TIMEOUT)
00137 
00138   def test_readable(self):
00139     data=[0]
00140     self.assertEqual(self._rb.readable(),0)
00141     self.assertEqual(self._rb.write("string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
00142     self.assertEqual(self._rb.readable(),1)
00143     self.assertEqual(self._rb.read(data),OpenRTM_aist.BufferStatus.BUFFER_OK)
00144     self.assertEqual(self._rb.readable(),0)
00145     self._rb.read(data)
00146     self.assertEqual(self._rb.readable(),0)
00147     
00148   def test_empty(self):
00149     data=[0]
00150     self.assertEqual(self._rb.empty(),True)
00151     self.assertEqual(self._rb.write("string"),OpenRTM_aist.BufferStatus.BUFFER_OK)
00152     self.assertEqual(self._rb.empty(),False)
00153     self.assertEqual(self._rb.read(data),OpenRTM_aist.BufferStatus.BUFFER_OK)
00154     self.assertEqual(self._rb.empty(),True)
00155     self._rb.read(data)
00156     self.assertEqual(self._rb.empty(),True)
00157     
00158   def COMMENTtest_isFull(self):
00159     self.assertEqual(self._rb.isFull(),False)
00160 
00161 
00162   def COMMENTtest_isEmpty(self):
00163     self.assertEqual(self._rb.isEmpty(),True)
00164     self._rb.init(0)
00165     self.assertEqual(self._rb.isEmpty(),False)
00166 
00167 
00168   def COMMENTtest_isNew(self):
00169     self.assertEqual(self._rb.isNew(),False)
00170     self._rb.init(0)
00171     self.assertEqual(self._rb.isNew(),True)
00172     data=[0]
00173     self._rb.read(data)
00174     self.assertEqual(self._rb.isNew(),False)
00175 
00176     self.assertEqual(self._rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
00177     self.assertEqual(self._rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
00178     self.assertEqual(self._rb.write(0.12345),OpenRTM_aist.BufferStatus.BUFFER_OK)
00179     self.assertEqual(self._rb.isNew(),True)
00180     self.assertEqual(self._rb.isNew(),True)
00181     self.assertEqual(self._rb.isNew(),True)
00182 
00183 
00184 ############### test #################
00185 if __name__ == '__main__':
00186         unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28