Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys,time
00019 sys.path.insert(1,"../")
00020
00021 import OpenRTM_aist
00022 import unittest
00023
00024 from PeriodicTask import *
00025 from TimeMeasure import *
00026
00027 class LoggerMock:
00028 def __init__(self):
00029 self.m_log = []
00030 return
00031
00032 def log(self, msg):
00033 self.m_log.append(msg);
00034 return
00035
00036 def countLog(self, msg):
00037 self.count = 0
00038 for i in range(len(self.m_log)):
00039 if self.m_log[i] == msg:
00040 self.count += 1
00041 return self.count
00042
00043 def clearLog(self):
00044 self.m_log = []
00045 return
00046
00047 class MyMock:
00048 def __init__(self):
00049 return
00050
00051 class mysvc2:
00052 def __init__(self):
00053 self.m_logger = None
00054 return
00055
00056 def __call__(self):
00057 if self.m_logger is not None:
00058 self.m_logger.log("mysvc2")
00059 return 0
00060
00061 def setLogger(self, logger):
00062 self.m_logger = logger
00063 return
00064
00065 class mysvc3:
00066 def __init__(self):
00067 self.m_logger = None
00068 return
00069
00070 def __call__(self):
00071 if self.m_logger is not None:
00072 self.m_logger.log("mysvc3")
00073 return 0
00074
00075 def setLogger(self, logger):
00076 self.m_logger = logger
00077 return
00078
00079 class TestPeriodicTask(unittest.TestCase):
00080 def setUp(self):
00081 return
00082
00083 def test_setTask(self):
00084 self._pt = PeriodicTask()
00085 self._my = MyMock()
00086 self._my.mysvc2 = MyMock().mysvc2()
00087 self._logger = LoggerMock()
00088 self._my.mysvc2.setLogger(self._logger)
00089
00090 self.assert_(self._pt.setTask(self._my.mysvc2))
00091 self.assertEqual(0, self._logger.countLog("mysvc2"))
00092 self._pt.activate()
00093 time.sleep(0.005)
00094 self._pt.finalize()
00095 time.sleep(0.005)
00096 self.assert_(1 < self._logger.countLog("mysvc2"))
00097 return
00098
00099 def test_setPeriodic(self):
00100 self._pt = PeriodicTask()
00101 self._my = MyMock()
00102 self._my.mysvc2 = MyMock().mysvc2()
00103 self._logger = LoggerMock()
00104 self._my.mysvc2.setLogger(self._logger)
00105
00106 self._pt.setTask(self._my.mysvc2)
00107 self._pt.setPeriod(0.05)
00108 self.assertEqual(0, self._logger.countLog("mysvc2"))
00109
00110 self._pt.activate()
00111 time.sleep(0.1)
00112 self._pt.suspend()
00113 time.sleep(0.05)
00114 self.assert_(4 > self._logger.countLog("mysvc2"))
00115 self.assert_(0 < self._logger.countLog("mysvc2"))
00116
00117 self._logger.clearLog()
00118 self._pt.setPeriod(0.01)
00119 self.assertEqual(0, self._logger.countLog("mysvc2"))
00120
00121 self._pt.resume()
00122 time.sleep(0.1)
00123 self._pt.suspend()
00124 time.sleep(0.01)
00125 self.assert_(12 > self._logger.countLog("mysvc2"))
00126 self.assert_( 8 < self._logger.countLog("mysvc2"))
00127
00128 self._logger.clearLog()
00129 self._pt.setPeriod(0.05)
00130 self.assertEqual(0, self._logger.countLog("mysvc2"))
00131
00132 self._pt.resume()
00133 time.sleep(0.1)
00134 self._pt.finalize()
00135 time.sleep(0.05)
00136 self.assert_(4 > self._logger.countLog("mysvc2"))
00137 self.assert_(0 < self._logger.countLog("mysvc2"))
00138 return
00139
00140 def test_signal(self):
00141 self._pt = PeriodicTask()
00142 self._my = MyMock()
00143 self._my.mysvc2 = MyMock().mysvc2()
00144 self._logger = LoggerMock()
00145 self._my.mysvc2.setLogger(self._logger)
00146
00147 self._pt.setTask(self._my.mysvc2)
00148 self._pt.setPeriod(0.05)
00149 self.assertEqual(0, self._logger.countLog("mysvc2"))
00150
00151 self._pt.activate()
00152 time.sleep(0.2)
00153 self._pt.suspend()
00154 count = self._logger.countLog("mysvc2")
00155
00156 time.sleep(0.2)
00157 self.assertEqual(count, self._logger.countLog("mysvc2"))
00158
00159 self._pt.signal()
00160 time.sleep(0.2)
00161 self.assertEqual(count+1, self._logger.countLog("mysvc2"))
00162
00163 self._pt.signal()
00164 time.sleep(0.2)
00165 self.assertEqual(count+2, self._logger.countLog("mysvc2"))
00166
00167 self._logger.clearLog()
00168 self._pt.resume()
00169 time.sleep(0.2)
00170 self._pt.suspend()
00171 time.sleep(0.2)
00172 self.assert_(6 > self._logger.countLog("mysvc2"))
00173 self.assert_(2 < self._logger.countLog("mysvc2"))
00174
00175 self._pt.finalize()
00176 time.sleep(0.2)
00177 return
00178
00179 def test_executionMeasure(self):
00180 self._pt = PeriodicTask()
00181 self._my = MyMock()
00182 self._my.mysvc3 = MyMock().mysvc3()
00183 self._logger = LoggerMock()
00184 self._my.mysvc3.setLogger(self._logger)
00185
00186 wait = 0.03
00187 self._pt.setTask(self._my.mysvc3)
00188 self._pt.setPeriod(0.05)
00189 self._pt.executionMeasure(True)
00190
00191
00192 self._pt.activate()
00193 time.sleep(0.6)
00194 self._pt.suspend()
00195 time.sleep(0.05)
00196 estat = self._pt.getExecStat()
00197 ss = ""
00198 ss = ss + "wait: " + str(wait) + "\n"
00199 ss = ss + "estat max: " + str(estat._max_interval) + "\n"
00200 ss = ss + "estat min: " + str(estat._min_interval) + "\n"
00201 ss = ss + "estat mean: " + str(estat._mean_interval) + "\n"
00202 ss = ss + "estat sdev: " + str(estat._std_deviation) + "\n"
00203 self.assert_(estat._max_interval < (wait + 0.030), ss)
00204 self.assert_(estat._min_interval > (wait - 0.03), ss)
00205 self.assert_(abs(estat._mean_interval - wait) < 0.03, ss)
00206 self.assert_(estat._std_deviation < (wait / 5.0), ss)
00207
00208
00209 self._pt.executionMeasureCount(5)
00210 self._pt.resume()
00211 time.sleep(0.3)
00212 self._pt.suspend()
00213 time.sleep(0.05)
00214 estat = self._pt.getExecStat()
00215 ss = ""
00216 ss = ss + "wait: " + str(wait) + "\n"
00217 ss = ss + "estat max: " + str(estat._max_interval) + "\n"
00218 ss = ss + "estat min: " + str(estat._min_interval) + "\n"
00219 ss = ss + "estat mean: " + str(estat._mean_interval) + "\n"
00220 ss = ss + "estat sdev: " + str(estat._std_deviation) + "\n"
00221 self.assert_(estat._max_interval < (wait + 0.030), ss)
00222 self.assert_(estat._min_interval > (wait - 0.03), ss)
00223 self.assert_(abs(estat._mean_interval - wait) < 0.03, ss)
00224 self.assert_(estat._std_deviation < (wait / 5.0), ss)
00225
00226
00227 self._pt.executionMeasureCount(10)
00228 self._pt.resume()
00229 time.sleep(0.3)
00230 self._pt.suspend()
00231 time.sleep(0.05)
00232 self._pt.finalize()
00233 estat2 = self._pt.getExecStat()
00234
00235
00236 self.assert_(estat._max_interval == estat2._max_interval)
00237 self.assert_(estat._min_interval == estat2._min_interval)
00238 self.assert_(estat._mean_interval == estat2._mean_interval)
00239 self.assert_(estat._std_deviation == estat2._std_deviation)
00240 return
00241
00242 def test_periodicMeasure(self):
00243 self._pt = PeriodicTask()
00244 self._my = MyMock()
00245 self._my.mysvc3 = MyMock().mysvc3()
00246 self._logger = LoggerMock()
00247 self._my.mysvc3.setLogger(self._logger)
00248
00249 wait = 0.05
00250 self._pt.setTask(self._my.mysvc3)
00251 self._pt.setPeriod(0.05)
00252 self._pt.periodicMeasure(True)
00253
00254
00255 self._pt.activate()
00256 time.sleep(0.6)
00257 self._pt.suspend()
00258 time.sleep(0.05)
00259 pstat = self._pt.getPeriodStat()
00260 self.assert_(pstat._max_interval < (wait + 0.030))
00261 self.assert_(pstat._min_interval > (wait - 0.050))
00262 self.assert_(abs(pstat._mean_interval - wait) < 0.03)
00263 self.assert_(pstat._std_deviation < (wait / 1.0))
00264
00265
00266 self._pt.periodicMeasureCount(5)
00267 self._pt.resume()
00268 time.sleep(0.3)
00269 self._pt.suspend()
00270 time.sleep(0.05)
00271 pstat = self._pt.getPeriodStat()
00272 self.assert_(pstat._max_interval < (wait + 0.030))
00273 self.assert_(pstat._min_interval > (wait - 0.010))
00274 self.assert_(abs(pstat._mean_interval - wait) < 0.03)
00275 self.assert_(pstat._std_deviation < (wait / 5.0))
00276
00277
00278 self._pt.periodicMeasureCount(10)
00279 self._pt.resume()
00280 time.sleep(0.3)
00281 self._pt.suspend()
00282 time.sleep(0.05)
00283 self._pt.finalize()
00284 pstat2 = self._pt.getPeriodStat()
00285
00286
00287
00288
00289
00290
00291
00292 return
00293
00294
00295
00296 if __name__ == '__main__':
00297 unittest.main()