1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 from __future__ import print_function
34
35 import os
36 import sys
37 import logging
38 import time
39 import unittest
40
41 import rospkg
42 import roslaunch
43 import roslib.packages
44
45 from rostest.rostestutil import createXMLRunner, printSummary, printRostestSummary, \
46 xmlResultsFile, printlog, printlogerr
47 from rostest.rostest_parent import ROSTestLaunchParent
48 import rosunit.junitxml
49
50 _DEFAULT_TEST_PORT = 22422
51
52
53
54 _results = rosunit.junitxml.Result('rostest', 0, 0, 0)
57
60
61 _textMode = False
62 -def setTextMode(val):
63 global _textMode
64 _textMode = val
65
66
67
68 _test_parents = []
69 _config = None
75
78
81
82
83
84
86 def fn(self):
87 print("Duplicate tests named [%s] in rostest suite"%testName)
88 self.fail("Duplicate tests named [%s] in rostest suite"%testName)
89 return fn
90
92 def fn(self):
93 print(message, file=sys.stderr)
94 self.fail(message)
95 return fn
96
98 """
99 Test function generator that takes in a roslaunch Test object and
100 returns a class instance method that runs the test. TestCase
101 setUp() is responsible for ensuring that the rest of the roslaunch
102 state is correct and tearDown() is responsible for tearing
103 everything down cleanly.
104 @param test: rost test to run
105 @type test: roslaunch.Test
106 @return: function object to run testObj
107 @rtype: fn
108 """
109
110
111 def fn(self):
112 done = False
113 while not done:
114 self.assert_(self.test_parent is not None, "ROSTestParent initialization failed")
115
116 test_name = test.test_name
117
118 printlog("Running test [%s]", test_name)
119
120
121 succeeded, failed = self.test_parent.launch()
122 self.assert_(not failed, "Test Fixture Nodes %s failed to launch"%failed)
123
124
125
126 test_file = xmlResultsFile(test_pkg, test_name, False)
127 if os.path.exists(test_file):
128 printlog("removing previous test results file [%s]", test_file)
129 os.remove(test_file)
130
131
132
133
134
135 XML_OUTPUT_FLAG='--gtest_output=xml:'
136
137 test.args = "%s %s%s"%(test.args, XML_OUTPUT_FLAG, test_file)
138 if _textMode:
139 test.output = 'screen'
140 test.args = test.args + " --text"
141
142
143 printlog("running test %s"%test_name)
144 timeout_failure = False
145 try:
146 self.test_parent.run_test(test)
147 except roslaunch.launch.RLTestTimeoutException as e:
148 if test.retry:
149 timeout_failure = True
150 else:
151 raise
152
153 if not timeout_failure:
154 printlog("test [%s] finished"%test_name)
155 else:
156 printlogerr("test [%s] timed out"%test_name)
157
158
159 if not _textMode or timeout_failure:
160
161 if not timeout_failure:
162 self.assert_(os.path.isfile(test_file), "test [%s] did not generate test results"%test_name)
163 printlog("test [%s] results are in [%s]", test_name, test_file)
164 results = rosunit.junitxml.read(test_file, test_name)
165 test_fail = results.num_errors or results.num_failures
166 else:
167 test_fail = True
168
169 if test.retry > 0 and test_fail:
170 test.retry -= 1
171 printlog("test [%s] failed, retrying. Retries left: %s"%(test_name, test.retry))
172 self.tearDown()
173 self.setUp()
174 else:
175 done = True
176 _accumulateResults(results)
177 printlog("test [%s] results summary: %s errors, %s failures, %s tests",
178 test_name, results.num_errors, results.num_failures, results.num_tests)
179
180
181
182 else:
183 if test.retry:
184 printlogerr("retry is disabled in --text mode")
185 done = True
186 printlog("[ROSTEST] test [%s] done", test_name)
187
188 return fn
189
190
192
193
194
195 self.test_parent = ROSTestLaunchParent(self.config, [self.test_file], port=_DEFAULT_TEST_PORT)
196
197 printlog("setup[%s] run_id[%s] starting", self.test_file, self.test_parent.run_id)
198
199 self.test_parent.setUp()
200
201
202 self.config = self.test_parent.config
203
204 _addRostestParent(self.test_parent)
205
206 printlog("setup[%s] run_id[%s] done", self.test_file, self.test_parent.run_id)
207
208
210 printlog("tearDown[%s]", self.test_file)
211
212 if self.test_parent:
213 self.test_parent.tearDown()
214
215 printlog("rostest teardown %s complete", self.test_file)
216
218 """
219 Unit test factory. Constructs a unittest class based on the roslaunch
220
221 @param pkg: package name
222 @type pkg: str
223 @param test_file: rostest filename
224 @type test_file: str
225 """
226
227 config = roslaunch.parent.load_config_default([test_file], _DEFAULT_TEST_PORT)
228
229
230 classdict = { 'setUp': setUp, 'tearDown': tearDown, 'config': config,
231 'test_parent': None, 'test_file': test_file }
232
233
234 testNames = []
235 for test in config.tests:
236
237 err_msg = None
238 try:
239 rp = rospkg.RosPack()
240 cmd = roslib.packages.find_node(test.package, test.type, rp)
241 if not cmd:
242 err_msg = "Test node [%s/%s] does not exist or is not executable"%(test.package, test.type)
243 except rospkg.ResourceNotFound as e:
244 err_msg = "Package [%s] for test node [%s/%s] does not exist"%(test.package, test.package, test.type)
245
246 testName = 'test%s'%(test.test_name)
247 if err_msg:
248 classdict[testName] = failRunner(test.test_name, err_msg)
249 elif testName in testNames:
250 classdict[testName] = failDuplicateRunner(test.test_name)
251 else:
252 classdict[testName] = rostestRunner(test, pkg)
253 testNames.append(testName)
254
255
256 return type('RosTest',(unittest.TestCase,),classdict)
257