test_PeriodicTask.py
Go to the documentation of this file.
1 #!/usr/bin/env/python
2 # -*- Python -*-
3 
4 #
5 # \file PeriodicTask.py
6 # \brief PeriodicTask class
7 # \date $Date: $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2007
11 # Task-intelligence Research Group,
12 # Intelligent Systems Research Institute,
13 # National Institute of
14 # Advanced Industrial Science and Technology (AIST), Japan
15 # All rights reserved.
16 #
17 
18 import sys,time
19 sys.path.insert(1,"../")
20 
21 import OpenRTM_aist
22 import unittest
23 
24 from PeriodicTask import *
25 from TimeMeasure import *
26 
27 class LoggerMock:
28  def __init__(self):
29  self.m_log = []
30  return
31 
32  def log(self, msg):
33  self.m_log.append(msg);
34  return
35 
36  def countLog(self, msg):
37  self.count = 0
38  for i in range(len(self.m_log)):
39  if self.m_log[i] == msg:
40  self.count += 1
41  return self.count
42 
43  def clearLog(self):
44  self.m_log = []
45  return
46 
47 class MyMock:
48  def __init__(self):
49  return
50 
51  class mysvc2:
52  def __init__(self):
53  self.m_logger = None
54  return
55 
56  def __call__(self):
57  if self.m_logger is not None:
58  self.m_logger.log("mysvc2")
59  return 0
60 
61  def setLogger(self, logger):
62  self.m_logger = logger
63  return
64 
65  class mysvc3:
66  def __init__(self):
67  self.m_logger = None
68  return
69 
70  def __call__(self):
71  if self.m_logger is not None:
72  self.m_logger.log("mysvc3")
73  return 0
74 
75  def setLogger(self, logger):
76  self.m_logger = logger
77  return
78 
79 class TestPeriodicTask(unittest.TestCase):
80  def setUp(self):
81  return
82 
83  def test_setTask(self):
84  self._pt = PeriodicTask()
85  self._my = MyMock()
86  self._my.mysvc2 = MyMock().mysvc2()
88  self._my.mysvc2.setLogger(self._logger)
89 
90  self.assert_(self._pt.setTask(self._my.mysvc2))
91  self.assertEqual(0, self._logger.countLog("mysvc2"))
92  self._pt.activate()
93  time.sleep(0.005)
94  self._pt.finalize()
95  time.sleep(0.005)
96  self.assert_(1 < self._logger.countLog("mysvc2"))
97  return
98 
99  def test_setPeriodic(self):
100  self._pt = PeriodicTask()
101  self._my = MyMock()
102  self._my.mysvc2 = MyMock().mysvc2()
103  self._logger = LoggerMock()
104  self._my.mysvc2.setLogger(self._logger)
105 
106  self._pt.setTask(self._my.mysvc2)
107  self._pt.setPeriod(0.05)
108  self.assertEqual(0, self._logger.countLog("mysvc2"))
109 
110  self._pt.activate()
111  time.sleep(0.1)
112  self._pt.suspend()
113  time.sleep(0.05)
114  self.assert_(4 > self._logger.countLog("mysvc2"))
115  self.assert_(0 < self._logger.countLog("mysvc2"))
116 
117  self._logger.clearLog()
118  self._pt.setPeriod(0.01)
119  self.assertEqual(0, self._logger.countLog("mysvc2"))
120 
121  self._pt.resume()
122  time.sleep(0.1)
123  self._pt.suspend()
124  time.sleep(0.01)
125  self.assert_(12 > self._logger.countLog("mysvc2"))
126  self.assert_( 8 < self._logger.countLog("mysvc2"))
127 
128  self._logger.clearLog()
129  self._pt.setPeriod(0.05)
130  self.assertEqual(0, self._logger.countLog("mysvc2"))
131 
132  self._pt.resume()
133  time.sleep(0.1)
134  self._pt.finalize()
135  time.sleep(0.05)
136  self.assert_(4 > self._logger.countLog("mysvc2"))
137  self.assert_(0 < self._logger.countLog("mysvc2"))
138  return
139 
140  def test_signal(self):
141  self._pt = PeriodicTask()
142  self._my = MyMock()
143  self._my.mysvc2 = MyMock().mysvc2()
144  self._logger = LoggerMock()
145  self._my.mysvc2.setLogger(self._logger)
146 
147  self._pt.setTask(self._my.mysvc2)
148  self._pt.setPeriod(0.05)
149  self.assertEqual(0, self._logger.countLog("mysvc2"))
150 
151  self._pt.activate()
152  time.sleep(0.2)
153  self._pt.suspend()
154  count = self._logger.countLog("mysvc2")
155 
156  time.sleep(0.2)
157  self.assertEqual(count, self._logger.countLog("mysvc2"))
158 
159  self._pt.signal()
160  time.sleep(0.2)
161  self.assertEqual(count+1, self._logger.countLog("mysvc2"))
162 
163  self._pt.signal()
164  time.sleep(0.2)
165  self.assertEqual(count+2, self._logger.countLog("mysvc2"))
166 
167  self._logger.clearLog()
168  self._pt.resume()
169  time.sleep(0.2)
170  self._pt.suspend()
171  time.sleep(0.2)
172  self.assert_(6 > self._logger.countLog("mysvc2"))
173  self.assert_(2 < self._logger.countLog("mysvc2"))
174 
175  self._pt.finalize()
176  time.sleep(0.2)
177  return
178 
180  self._pt = PeriodicTask()
181  self._my = MyMock()
182  self._my.mysvc3 = MyMock().mysvc3()
183  self._logger = LoggerMock()
184  self._my.mysvc3.setLogger(self._logger)
185 
186  wait = 0.03 # sec
187  self._pt.setTask(self._my.mysvc3)
188  self._pt.setPeriod(0.05)
189  self._pt.executionMeasure(True)
190 
191  # executionMeasureConut: 10
192  self._pt.activate()
193  time.sleep(0.6)
194  self._pt.suspend()
195  time.sleep(0.05)
196  estat = self._pt.getExecStat()
197  ss = ""
198  ss = ss + "wait: " + str(wait) + "\n"
199  ss = ss + "estat max: " + str(estat._max_interval) + "\n"
200  ss = ss + "estat min: " + str(estat._min_interval) + "\n"
201  ss = ss + "estat mean: " + str(estat._mean_interval) + "\n"
202  ss = ss + "estat sdev: " + str(estat._std_deviation) + "\n"
203  self.assert_(estat._max_interval < (wait + 0.030), ss)
204  self.assert_(estat._min_interval > (wait - 0.03), ss)
205  self.assert_(abs(estat._mean_interval - wait) < 0.03, ss)
206  self.assert_(estat._std_deviation < (wait / 5.0), ss)
207 
208  # executionMeasureConut: 5
209  self._pt.executionMeasureCount(5)
210  self._pt.resume()
211  time.sleep(0.3)
212  self._pt.suspend()
213  time.sleep(0.05)
214  estat = self._pt.getExecStat()
215  ss = ""
216  ss = ss + "wait: " + str(wait) + "\n"
217  ss = ss + "estat max: " + str(estat._max_interval) + "\n"
218  ss = ss + "estat min: " + str(estat._min_interval) + "\n"
219  ss = ss + "estat mean: " + str(estat._mean_interval) + "\n"
220  ss = ss + "estat sdev: " + str(estat._std_deviation) + "\n"
221  self.assert_(estat._max_interval < (wait + 0.030), ss)
222  self.assert_(estat._min_interval > (wait - 0.03), ss)
223  self.assert_(abs(estat._mean_interval - wait) < 0.03, ss)
224  self.assert_(estat._std_deviation < (wait / 5.0), ss)
225 
226  # executionMeasureConut: lessthan
227  self._pt.executionMeasureCount(10)
228  self._pt.resume()
229  time.sleep(0.3)
230  self._pt.suspend()
231  time.sleep(0.05)
232  self._pt.finalize()
233  estat2 = self._pt.getExecStat()
234 
235  # periodicMeasureConut: lessthan
236  self.assert_(estat._max_interval == estat2._max_interval)
237  self.assert_(estat._min_interval == estat2._min_interval)
238  self.assert_(estat._mean_interval == estat2._mean_interval)
239  self.assert_(estat._std_deviation == estat2._std_deviation)
240  return
241 
243  self._pt = PeriodicTask()
244  self._my = MyMock()
245  self._my.mysvc3 = MyMock().mysvc3()
246  self._logger = LoggerMock()
247  self._my.mysvc3.setLogger(self._logger)
248 
249  wait = 0.05 # sec
250  self._pt.setTask(self._my.mysvc3)
251  self._pt.setPeriod(0.05)
252  self._pt.periodicMeasure(True)
253 
254  # periodicMeasureConut: 10
255  self._pt.activate()
256  time.sleep(0.6)
257  self._pt.suspend()
258  time.sleep(0.05)
259  pstat = self._pt.getPeriodStat()
260  self.assert_(pstat._max_interval < (wait + 0.030))
261  self.assert_(pstat._min_interval > (wait - 0.050))
262  self.assert_(abs(pstat._mean_interval - wait) < 0.03)
263  self.assert_(pstat._std_deviation < (wait / 1.0))
264 
265  # periodicMeasureConut:5
266  self._pt.periodicMeasureCount(5)
267  self._pt.resume()
268  time.sleep(0.3)
269  self._pt.suspend()
270  time.sleep(0.05)
271  pstat = self._pt.getPeriodStat()
272  self.assert_(pstat._max_interval < (wait + 0.030))
273  self.assert_(pstat._min_interval > (wait - 0.010))
274  self.assert_(abs(pstat._mean_interval - wait) < 0.03)
275  self.assert_(pstat._std_deviation < (wait / 5.0))
276 
277  # periodicMeasureConut: lessthan
278  self._pt.periodicMeasureCount(10)
279  self._pt.resume()
280  time.sleep(0.3)
281  self._pt.suspend()
282  time.sleep(0.05)
283  self._pt.finalize()
284  pstat2 = self._pt.getPeriodStat()
285 
286  # periodicMeasureConut: lessthan
287  # Comment: Some errors may be observed.
288  #self.assert_(pstat._max_interval == pstat2._max_interval)
289  #self.assert_(pstat._min_interval == pstat2._min_interval)
290  #self.assert_(pstat._mean_interval == pstat2._mean_interval)
291  #self.assert_(pstat._std_deviation == pstat2._std_deviation)
292  return
293 
294 
295 
296 if __name__ == '__main__':
297  unittest.main()
def append(dest, src)
Definition: NVUtil.py:386


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:07