test_PeriodicTask.py
Go to the documentation of this file.
00001 #!/usr/bin/env/python
00002 # -*- Python -*-
00003 
00004 #
00005 # \file PeriodicTask.py
00006 # \brief PeriodicTask class
00007 # \date $Date: $
00008 # \author Shinji Kurihara
00009 #
00010 # Copyright (C) 2007
00011 #     Task-intelligence Research Group,
00012 #     Intelligent Systems Research Institute,
00013 #     National Institute of
00014 #         Advanced Industrial Science and Technology (AIST), Japan
00015 #     All rights reserved.
00016 #
00017 
00018 import sys,time
00019 sys.path.insert(1,"../")
00020 
00021 import OpenRTM_aist
00022 import unittest
00023 
00024 from PeriodicTask import *
00025 from TimeMeasure import *
00026 
00027 class LoggerMock:
00028         def __init__(self):
00029                 self.m_log = []
00030                 return
00031 
00032         def log(self, msg):
00033                 self.m_log.append(msg);
00034                 return
00035 
00036         def countLog(self, msg):
00037                 self.count = 0
00038                 for i in range(len(self.m_log)):
00039                         if self.m_log[i] == msg:
00040                                 self.count += 1
00041                 return self.count
00042 
00043         def clearLog(self):
00044                 self.m_log = []
00045                 return
00046 
00047 class MyMock:
00048         def __init__(self):
00049                 return
00050 
00051         class mysvc2:
00052                 def __init__(self):
00053                         self.m_logger = None
00054                         return
00055 
00056                 def __call__(self):
00057                         if self.m_logger is not None:
00058                                 self.m_logger.log("mysvc2")
00059                         return 0
00060 
00061                 def setLogger(self, logger):
00062                         self.m_logger = logger
00063                         return
00064 
00065         class mysvc3:
00066                 def __init__(self):
00067                         self.m_logger = None
00068                         return
00069 
00070                 def __call__(self):
00071                         if self.m_logger is not None:
00072                                 self.m_logger.log("mysvc3")
00073                         return 0
00074 
00075                 def setLogger(self, logger):
00076                         self.m_logger = logger
00077                         return
00078 
00079 class TestPeriodicTask(unittest.TestCase):
00080         def setUp(self):
00081                 return
00082 
00083         def test_setTask(self):
00084                 self._pt = PeriodicTask()
00085                 self._my = MyMock()
00086                 self._my.mysvc2 = MyMock().mysvc2()
00087                 self._logger = LoggerMock()
00088                 self._my.mysvc2.setLogger(self._logger)
00089 
00090                 self.assert_(self._pt.setTask(self._my.mysvc2))
00091                 self.assertEqual(0, self._logger.countLog("mysvc2"))
00092                 self._pt.activate()
00093                 time.sleep(0.005)
00094                 self._pt.finalize()
00095                 time.sleep(0.005)
00096                 self.assert_(1 < self._logger.countLog("mysvc2"))
00097                 return
00098 
00099         def test_setPeriodic(self):
00100                 self._pt = PeriodicTask()
00101                 self._my = MyMock()
00102                 self._my.mysvc2 = MyMock().mysvc2()
00103                 self._logger = LoggerMock()
00104                 self._my.mysvc2.setLogger(self._logger)
00105 
00106                 self._pt.setTask(self._my.mysvc2)
00107                 self._pt.setPeriod(0.05)
00108                 self.assertEqual(0, self._logger.countLog("mysvc2"))
00109 
00110                 self._pt.activate()
00111                 time.sleep(0.1)
00112                 self._pt.suspend()
00113                 time.sleep(0.05)
00114                 self.assert_(4 > self._logger.countLog("mysvc2"))
00115                 self.assert_(0 < self._logger.countLog("mysvc2"))
00116 
00117                 self._logger.clearLog()
00118                 self._pt.setPeriod(0.01)
00119                 self.assertEqual(0, self._logger.countLog("mysvc2"))
00120 
00121                 self._pt.resume()
00122                 time.sleep(0.1)
00123                 self._pt.suspend()
00124                 time.sleep(0.01)
00125                 self.assert_(12 > self._logger.countLog("mysvc2"))
00126                 self.assert_( 8 < self._logger.countLog("mysvc2"))
00127 
00128                 self._logger.clearLog()
00129                 self._pt.setPeriod(0.05)
00130                 self.assertEqual(0, self._logger.countLog("mysvc2"))
00131 
00132                 self._pt.resume()
00133                 time.sleep(0.1)
00134                 self._pt.finalize()
00135                 time.sleep(0.05)
00136                 self.assert_(4 > self._logger.countLog("mysvc2"))
00137                 self.assert_(0 < self._logger.countLog("mysvc2"))
00138                 return
00139 
00140         def test_signal(self):
00141                 self._pt = PeriodicTask()
00142                 self._my = MyMock()
00143                 self._my.mysvc2 = MyMock().mysvc2()
00144                 self._logger = LoggerMock()
00145                 self._my.mysvc2.setLogger(self._logger)
00146 
00147                 self._pt.setTask(self._my.mysvc2)
00148                 self._pt.setPeriod(0.05)
00149                 self.assertEqual(0, self._logger.countLog("mysvc2"))
00150 
00151                 self._pt.activate()
00152                 time.sleep(0.2)
00153                 self._pt.suspend()
00154                 count = self._logger.countLog("mysvc2")
00155 
00156                 time.sleep(0.2)
00157                 self.assertEqual(count, self._logger.countLog("mysvc2"))
00158 
00159                 self._pt.signal()
00160                 time.sleep(0.2)
00161                 self.assertEqual(count+1, self._logger.countLog("mysvc2"))
00162 
00163                 self._pt.signal()
00164                 time.sleep(0.2)
00165                 self.assertEqual(count+2, self._logger.countLog("mysvc2"))
00166 
00167                 self._logger.clearLog()
00168                 self._pt.resume()
00169                 time.sleep(0.2)
00170                 self._pt.suspend()
00171                 time.sleep(0.2)
00172                 self.assert_(6 > self._logger.countLog("mysvc2"))
00173                 self.assert_(2 < self._logger.countLog("mysvc2"))
00174 
00175                 self._pt.finalize()
00176                 time.sleep(0.2)
00177                 return
00178 
00179         def test_executionMeasure(self):
00180                 self._pt = PeriodicTask()
00181                 self._my = MyMock()
00182                 self._my.mysvc3 = MyMock().mysvc3()
00183                 self._logger = LoggerMock()
00184                 self._my.mysvc3.setLogger(self._logger)
00185 
00186                 wait = 0.03     # sec
00187                 self._pt.setTask(self._my.mysvc3)
00188                 self._pt.setPeriod(0.05)
00189                 self._pt.executionMeasure(True)
00190 
00191                 # executionMeasureConut: 10
00192                 self._pt.activate()
00193                 time.sleep(0.6)
00194                 self._pt.suspend()
00195                 time.sleep(0.05)
00196                 estat = self._pt.getExecStat()
00197                 ss = ""
00198                 ss = ss + "wait:  " + str(wait) + "\n"
00199                 ss = ss + "estat max:  " + str(estat._max_interval) + "\n"
00200                 ss = ss + "estat min:  " + str(estat._min_interval) + "\n"
00201                 ss = ss + "estat mean: " + str(estat._mean_interval) + "\n"
00202                 ss = ss + "estat sdev: " + str(estat._std_deviation) + "\n"
00203                 self.assert_(estat._max_interval < (wait + 0.030), ss)
00204                 self.assert_(estat._min_interval > (wait - 0.03), ss)
00205                 self.assert_(abs(estat._mean_interval - wait) < 0.03, ss)
00206                 self.assert_(estat._std_deviation < (wait / 5.0), ss)
00207 
00208                 # executionMeasureConut: 5
00209                 self._pt.executionMeasureCount(5)
00210                 self._pt.resume()
00211                 time.sleep(0.3)
00212                 self._pt.suspend()
00213                 time.sleep(0.05)
00214                 estat = self._pt.getExecStat()
00215                 ss = ""
00216                 ss = ss + "wait:  " + str(wait) + "\n"
00217                 ss = ss + "estat max:  " + str(estat._max_interval) + "\n"
00218                 ss = ss + "estat min:  " + str(estat._min_interval) + "\n"
00219                 ss = ss + "estat mean: " + str(estat._mean_interval) + "\n"
00220                 ss = ss + "estat sdev: " + str(estat._std_deviation) + "\n"
00221                 self.assert_(estat._max_interval < (wait + 0.030), ss)
00222                 self.assert_(estat._min_interval > (wait - 0.03), ss)
00223                 self.assert_(abs(estat._mean_interval - wait) < 0.03, ss)
00224                 self.assert_(estat._std_deviation < (wait / 5.0), ss)
00225 
00226                 # executionMeasureConut: lessthan
00227                 self._pt.executionMeasureCount(10)
00228                 self._pt.resume()
00229                 time.sleep(0.3)
00230                 self._pt.suspend()
00231                 time.sleep(0.05)
00232                 self._pt.finalize()
00233                 estat2 = self._pt.getExecStat()
00234 
00235                 # periodicMeasureConut: lessthan
00236                 self.assert_(estat._max_interval == estat2._max_interval)
00237                 self.assert_(estat._min_interval == estat2._min_interval)
00238                 self.assert_(estat._mean_interval == estat2._mean_interval)
00239                 self.assert_(estat._std_deviation == estat2._std_deviation)
00240                 return
00241 
00242         def test_periodicMeasure(self):
00243                 self._pt = PeriodicTask()
00244                 self._my = MyMock()
00245                 self._my.mysvc3 = MyMock().mysvc3()
00246                 self._logger = LoggerMock()
00247                 self._my.mysvc3.setLogger(self._logger)
00248 
00249                 wait = 0.05     # sec
00250                 self._pt.setTask(self._my.mysvc3)
00251                 self._pt.setPeriod(0.05)
00252                 self._pt.periodicMeasure(True)
00253 
00254                 # periodicMeasureConut: 10
00255                 self._pt.activate()
00256                 time.sleep(0.6)
00257                 self._pt.suspend()
00258                 time.sleep(0.05)
00259                 pstat = self._pt.getPeriodStat()
00260                 self.assert_(pstat._max_interval < (wait + 0.030))
00261                 self.assert_(pstat._min_interval > (wait - 0.050))
00262                 self.assert_(abs(pstat._mean_interval - wait) < 0.03)
00263                 self.assert_(pstat._std_deviation < (wait / 1.0))
00264 
00265                 # periodicMeasureConut:5
00266                 self._pt.periodicMeasureCount(5)
00267                 self._pt.resume()
00268                 time.sleep(0.3)
00269                 self._pt.suspend()
00270                 time.sleep(0.05)
00271                 pstat = self._pt.getPeriodStat()
00272                 self.assert_(pstat._max_interval < (wait + 0.030))
00273                 self.assert_(pstat._min_interval > (wait - 0.010))
00274                 self.assert_(abs(pstat._mean_interval - wait) < 0.03)
00275                 self.assert_(pstat._std_deviation < (wait / 5.0))
00276 
00277                 # periodicMeasureConut: lessthan
00278                 self._pt.periodicMeasureCount(10)
00279                 self._pt.resume()
00280                 time.sleep(0.3)
00281                 self._pt.suspend()
00282                 time.sleep(0.05)
00283                 self._pt.finalize()
00284                 pstat2 = self._pt.getPeriodStat()
00285 
00286                 # periodicMeasureConut: lessthan
00287                 # Comment: Some errors may be observed.
00288                 #self.assert_(pstat._max_interval == pstat2._max_interval)
00289                 #self.assert_(pstat._min_interval == pstat2._min_interval)
00290                 #self.assert_(pstat._mean_interval == pstat2._mean_interval)
00291                 #self.assert_(pstat._std_deviation == pstat2._std_deviation)
00292                 return
00293 
00294 
00295 ############### test #################
00296 if __name__ == '__main__':
00297         unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28