1 from __future__ import with_statement
2 import os
3 import sys
4 import logging
5 import time
6 import unittest
7
8 import roslaunch
9 import roslib.packages
10
11 from rostest.rostestutil import createXMLRunner, printSummary, printRostestSummary, \
12 xmlResultsFile, printlog, printlogerr
13 from rostest.rostest_parent import ROSTestLaunchParent
14 import rostest.xmlresults
15
16 _DEFAULT_TEST_PORT = 22422
17
18
19
20 _results = rostest.xmlresults.Result('rostest', 0, 0, 0)
23
26
27 _textMode = False
28 -def setTextMode(val):
29 global _textMode
30 _textMode = val
31
32
33
34 _test_parents = []
35 _config = None
41
44
47
48
49
50
52 def fn(self):
53 print "Duplicate tests named [%s] in rostest suite"%testName
54 self.fail("Duplicate tests named [%s] in rostest suite"%testName)
55 return fn
56
58 def fn(self):
59 print >> sys.stderr, message
60 self.fail(message)
61 return fn
62
64 """
65 Test function generator that takes in a roslaunch Test object and
66 returns a class instance method that runs the test. TestCase
67 setUp() is responsible for ensuring that the rest of the roslaunch
68 state is correct and tearDown() is responsible for tearing
69 everything down cleanly.
70 @param test: rost test to run
71 @type test: roslaunch.Test
72 @return: function object to run testObj
73 @rtype: fn
74 """
75
76
77 def fn(self):
78 done = False
79 while not done:
80 self.assert_(self.test_parent is not None, "ROSTestParent initialization failed")
81
82 test_name = test.test_name
83
84 printlog("Running test [%s]", test_name)
85
86
87 succeeded, failed = self.test_parent.launch()
88 self.assert_(not failed, "Test Fixture Nodes %s failed to launch"%failed)
89
90
91
92 test_file = xmlResultsFile(test_pkg, test_name, False)
93 if os.path.exists(test_file):
94 printlog("removing previous test results file [%s]", test_file)
95 os.remove(test_file)
96
97
98
99
100
101 XML_OUTPUT_FLAG='--gtest_output=xml:'
102
103 test.args = "%s %s%s"%(test.args, XML_OUTPUT_FLAG, test_file)
104 if _textMode:
105 test.output = 'screen'
106 test.args = test.args + " --text"
107
108
109 printlog("running test %s"%test_name)
110 timeout_failure = False
111 try:
112 self.test_parent.run_test(test)
113 except roslaunch.launch.RLTestTimeoutException, e:
114 if test.retry:
115 timeout_failure = True
116 else:
117 raise
118
119 if not timeout_failure:
120 printlog("test [%s] finished"%test_name)
121 else:
122 printlogerr("test [%s] timed out"%test_name)
123
124
125 if not _textMode or timeout_failure:
126
127 if not timeout_failure:
128 self.assert_(os.path.isfile(test_file), "test [%s] did not generate test results"%test_name)
129 printlog("test [%s] results are in [%s]", test_name, test_file)
130 results = rostest.xmlresults.read(test_file, test_name)
131 test_fail = results.num_errors or results.num_failures
132 else:
133 test_fail = True
134
135 if test.retry > 0 and test_fail:
136 test.retry -= 1
137 printlog("test [%s] failed, retrying. Retries left: %s"%(test_name, test.retry))
138 self.tearDown()
139 self.setUp()
140 else:
141 done = True
142 _accumulateResults(results)
143 printlog("test [%s] results summary: %s errors, %s failures, %s tests",
144 test_name, results.num_errors, results.num_failures, results.num_tests)
145
146
147
148 else:
149 if test.retry:
150 printlogerr("retry is disabled in --text mode")
151 done = True
152 printlog("[ROSTEST] test [%s] done", test_name)
153
154 return fn
155
156
158
159
160
161 self.test_parent = ROSTestLaunchParent(self.config, [self.test_file], port=_DEFAULT_TEST_PORT)
162
163 printlog("setup[%s] run_id[%s] starting", self.test_file, self.test_parent.run_id)
164
165 self.test_parent.setUp()
166
167
168 self.config = self.test_parent.config
169
170 _addRostestParent(self.test_parent)
171
172 printlog("setup[%s] run_id[%s] done", self.test_file, self.test_parent.run_id)
173
174
176 printlog("tearDown[%s]", self.test_file)
177
178 if self.test_parent:
179 self.test_parent.tearDown()
180
181 printlog("rostest teardown %s complete", self.test_file)
182
184 """
185 Unit test factory. Constructs a unittest class based on the roslaunch
186
187 @param pkg: package name
188 @type pkg: str
189 @param test_file: rostest filename
190 @type test_file: str
191 """
192
193 config = roslaunch.parent.load_config_default([test_file], _DEFAULT_TEST_PORT)
194
195
196 classdict = { 'setUp': setUp, 'tearDown': tearDown, 'config': config,
197 'test_parent': None, 'test_file': test_file }
198
199
200 testNames = []
201 for test in config.tests:
202
203 err_msg = None
204 try:
205 cmd = roslib.packages.find_node(test.package, test.type, \
206 test.machine.ros_root, test.machine.ros_package_path)
207 if not cmd:
208 err_msg = "Test node [%s/%s] does not exist or is not executable"%(test.package, test.type)
209 except roslib.packages.ROSPkgException, e:
210 err_msg = "Package [%s] for test node [%s/%s] does not exist"%(test.package, test.package, test.type)
211
212 testName = 'test%s'%(test.test_name)
213 if err_msg:
214 classdict[testName] = failRunner(test.test_name, err_msg)
215 elif testName in testNames:
216 classdict[testName] = failDuplicateRunner(test.test_name)
217 else:
218 classdict[testName] = rostestRunner(test, pkg)
219 testNames.append(testName)
220
221
222 return type('RosTest',(unittest.TestCase,),classdict)
223