1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 from __future__ import print_function
34
35 import os
36 import sys
37 import logging
38 import time
39 import unittest
40
41 import rospkg
42 from rospkg.environment import ROS_TEST_RESULTS_DIR
43 import roslaunch
44 import roslib.packages
45
46 from rostest.rostestutil import createXMLRunner, printSummary, printRostestSummary, \
47 xmlResultsFile, printlog, printlogerr
48 from rostest.rostest_parent import ROSTestLaunchParent
49 import rosunit.junitxml
50
51
52
53 _results = rosunit.junitxml.Result('rostest', 0, 0, 0)
56
59
60 _textMode = False
61 -def setTextMode(val):
62 global _textMode
63 _textMode = val
64
65
66
67 _test_parents = []
68 _config = None
74
77
80
81
82
83
85 def fn(self):
86 print("Duplicate tests named [%s] in rostest suite"%testName)
87 self.fail("Duplicate tests named [%s] in rostest suite"%testName)
88 return fn
89
91 def fn(self):
92 print(message, file=sys.stderr)
93 self.fail(message)
94 return fn
95
97 """
98 Test function generator that takes in a roslaunch Test object and
99 returns a class instance method that runs the test. TestCase
100 setUp() is responsible for ensuring that the rest of the roslaunch
101 state is correct and tearDown() is responsible for tearing
102 everything down cleanly.
103 @param test: rost test to run
104 @type test: roslaunch.Test
105 @return: function object to run testObj
106 @rtype: fn
107 """
108
109
110 def fn(self):
111 done = False
112 while not done:
113 self.assert_(self.test_parent is not None, "ROSTestParent initialization failed")
114
115 test_name = test.test_name
116
117 printlog("Running test [%s]", test_name)
118
119
120 succeeded, failed = self.test_parent.launch()
121 self.assert_(not failed, "Test Fixture Nodes %s failed to launch"%failed)
122
123
124
125 env = None
126 if results_base_dir:
127 env = {ROS_TEST_RESULTS_DIR: results_base_dir}
128 test_file = xmlResultsFile(test_pkg, test_name, False, env=env)
129 if os.path.exists(test_file):
130 printlog("removing previous test results file [%s]", test_file)
131 os.remove(test_file)
132
133
134
135
136
137 XML_OUTPUT_FLAG='--gtest_output=xml:'
138
139 test.args = "%s %s%s"%(test.args, XML_OUTPUT_FLAG, test_file)
140 if _textMode:
141 test.output = 'screen'
142 test.args = test.args + " --text"
143
144
145 printlog("running test %s"%test_name)
146 timeout_failure = False
147 try:
148 self.test_parent.run_test(test)
149 except roslaunch.launch.RLTestTimeoutException as e:
150 if test.retry:
151 timeout_failure = True
152 else:
153 raise
154
155 if not timeout_failure:
156 printlog("test [%s] finished"%test_name)
157 else:
158 printlogerr("test [%s] timed out"%test_name)
159
160
161 if not _textMode or timeout_failure:
162
163 if not timeout_failure:
164 self.assert_(os.path.isfile(test_file), "test [%s] did not generate test results"%test_name)
165 printlog("test [%s] results are in [%s]", test_name, test_file)
166 results = rosunit.junitxml.read(test_file, test_name)
167 test_fail = results.num_errors or results.num_failures
168 else:
169 test_fail = True
170
171 if test.retry > 0 and test_fail:
172 test.retry -= 1
173 printlog("test [%s] failed, retrying. Retries left: %s"%(test_name, test.retry))
174 self.tearDown()
175 self.setUp()
176 else:
177 done = True
178 _accumulateResults(results)
179 printlog("test [%s] results summary: %s errors, %s failures, %s tests",
180 test_name, results.num_errors, results.num_failures, results.num_tests)
181
182
183
184 else:
185 if test.retry:
186 printlogerr("retry is disabled in --text mode")
187 done = True
188 printlog("[ROSTEST] test [%s] done", test_name)
189
190 return fn
191
192
194
195
196
197 self.test_parent = ROSTestLaunchParent(self.config, [self.test_file], reuse_master=self.reuse_master, clear=self.clear)
198
199 printlog("setup[%s] run_id[%s] starting", self.test_file, self.test_parent.run_id)
200
201 self.test_parent.setUp()
202
203
204 self.config = self.test_parent.config
205
206 _addRostestParent(self.test_parent)
207
208 printlog("setup[%s] run_id[%s] done", self.test_file, self.test_parent.run_id)
209
210
212 printlog("tearDown[%s]", self.test_file)
213
214 if self.test_parent:
215 self.test_parent.tearDown()
216
217 printlog("rostest teardown %s complete", self.test_file)
218
219 -def createUnitTest(pkg, test_file, reuse_master=False, clear=False, results_base_dir=None):
220 """
221 Unit test factory. Constructs a unittest class based on the roslaunch
222
223 @param pkg: package name
224 @type pkg: str
225 @param test_file: rostest filename
226 @type test_file: str
227 """
228
229 config = roslaunch.parent.load_config_default([test_file], None)
230
231
232 classdict = { 'setUp': setUp, 'tearDown': tearDown, 'config': config,
233 'test_parent': None, 'test_file': test_file,
234 'reuse_master': reuse_master, 'clear': clear }
235
236
237 testNames = []
238 for test in config.tests:
239
240 err_msg = None
241 try:
242 rp = rospkg.RosPack()
243 cmd = roslib.packages.find_node(test.package, test.type, rp)
244 if not cmd:
245 err_msg = "Test node [%s/%s] does not exist or is not executable"%(test.package, test.type)
246 except rospkg.ResourceNotFound as e:
247 err_msg = "Package [%s] for test node [%s/%s] does not exist"%(test.package, test.package, test.type)
248
249 testName = 'test%s'%(test.test_name)
250 if err_msg:
251 classdict[testName] = failRunner(test.test_name, err_msg)
252 elif testName in testNames:
253 classdict[testName] = failDuplicateRunner(test.test_name)
254 else:
255 classdict[testName] = rostestRunner(test, pkg, results_base_dir=results_base_dir)
256 testNames.append(testName)
257
258
259 return type('RosTest',(unittest.TestCase,),classdict)
260