1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 rostest implementation of running bare (gtest-compatible) unit test
37 executables. These do not run in a ROS environment.
38 """
39
40 import os
41 import unittest
42 import time
43
44 import roslib.packages
45
46 import roslaunch.pmon
47
48 from rostest.rostestutil import createXMLRunner, printSummary, printRostestSummary, \
49 xmlResultsFile, rostest_name_from_path, printlog, printlogerr
50 import rosunit.junitxml
51
52 BARE_TIME_LIMIT = 60.
53
55
56 - def __init__(self, exe, args, results, retry=0, time_limit=None, test_name=None, package=None):
57 """
58 @param exe: path to executable to run
59 @type exe: str
60 @param args: arguments to exe
61 @type args: [str]
62 @param results: test results accumulator
63 @param retry: (optional) number of retries for test
64 @type retry: int
65 @param time_limit: (optional) time limit for test. Defaults to BARE_TIME_LIMIT.
66 @type time_limit: float
67 @param test_name: (optional) override automatically geneated test name
68 @type test_name: str
69 """
70 super(BareTestCase, self).__init__()
71 self.results = results
72 self.package = package
73 self.exe = exe
74 if test_name is None:
75 self.test_name = os.path.basename(exe)
76 else:
77 self.test_name = test_name
78
79 self.args = [self.exe] + args
80 self.retry = retry
81 self.time_limit = time_limit or BARE_TIME_LIMIT
82 self.pmon = None
83
85 self.pmon = roslaunch.pmon.start_process_monitor()
86
88 if self.pmon is not None:
89 roslaunch.pmon.shutdown_process_monitor(self.pmon)
90 self.pmon = None
91
93 self.failIf(self.package is None, "unable to determine package of executable")
94
95 done = False
96 while not done:
97 test_name = self.test_name
98
99 printlog("Running test [%s]", test_name)
100
101
102
103 test_file = xmlResultsFile(self.package, test_name, False)
104 if os.path.exists(test_file):
105 printlog("removing previous test results file [%s]", test_file)
106 os.remove(test_file)
107
108 self.args.append('--gtest_output=xml:%s'%test_file)
109
110
111 printlog("running test %s"%test_name)
112 timeout_failure = False
113
114 run_id = None
115
116 process = roslaunch.nodeprocess.LocalProcess(run_id, self.package, self.test_name, self.args, os.environ, False, cwd='cwd', is_node=False)
117
118 pm = self.pmon
119 pm.register(process)
120 success = process.start()
121 self.assert_(success, "test failed to start")
122
123
124 timeout_t = time.time() + self.time_limit
125 try:
126 while pm.mainthread_spin_once() and process.is_alive():
127
128 if time.time() > timeout_t:
129 raise roslaunch.launch.RLTestTimeoutException("test max time allotted")
130 time.sleep(0.1)
131
132 except roslaunch.launch.RLTestTimeoutException, e:
133 if self.retry:
134 timeout_failure = True
135 else:
136 raise
137
138 if not timeout_failure:
139 printlog("test [%s] finished"%test_name)
140 else:
141 printlogerr("test [%s] timed out"%test_name)
142
143
144 if not timeout_failure:
145 self.assert_(os.path.isfile(test_file), "test [%s] did not generate test results"%test_name)
146 printlog("test [%s] results are in [%s]", test_name, test_file)
147 results = rosunit.junitxml.read(test_file, test_name)
148 test_fail = results.num_errors or results.num_failures
149 else:
150 test_fail = True
151
152 if self.retry > 0 and test_fail:
153 self.retry -= 1
154 printlog("test [%s] failed, retrying. Retries left: %s"%(test_name, self.retry))
155 else:
156 done = True
157 self.results.accumulate(results)
158 printlog("test [%s] results summary: %s errors, %s failures, %s tests",
159 test_name, results.num_errors, results.num_failures, results.num_tests)
160
161 printlog("[ROSTEST] test [%s] done", test_name)
162