1
2
3
4
5
6
7
8
9
10
11 from __future__ import print_function
12
13 import os
14 import sys
15 import unittest
16 from urlparse import urlparse
17 import rospkg
18 import roslib
19 import roslaunch
20 import rosunit
21
22
23 from loggers import printlog, printlogerr
24 from test_parent import RoconTestLaunchParent
25
26
27
28
29
30 _DEFAULT_TEST_PORT = 22422
31 _results = rosunit.junitxml.Result('rocon_test', 0, 0, 0)
32 _text_mode = False
33 _pause_mode = False
34
35
38
39
42
43
45 global _text_mode
46 _text_mode = val
47
48
52
53
54
55
56
57
58
59 _test_parents = []
60
61 _config = None
62
63
68
69
72
73
74
75
76
77
79 '''
80 Provides configuration details for a rocon test launch configuration
81 associated with a single master.
82 '''
84
85 self.launcher = launcher
86
87 self.configuration = roslaunch.parent.load_config_default([launcher['path']], launcher['port'])
88 self.test_parent = None
89
90
92 '''
93 Function that becomes TestCase.setUp()
94
95 For each launcher representing a launch on a single master in the rocon
96 launcher, this makes sure there is an entity that is to be the 'parent'
97 who will later be responsible for shutting everything down.
98
99 This could be prone to problems if someone puts multiple launchers in
100 rocon launcher with the same master port (untested).
101 '''
102
103
104
105 uuids = {}
106 for rocon_launch_configuration in self.rocon_launch_configurations:
107 config = rocon_launch_configuration.configuration
108 launcher = rocon_launch_configuration.launcher
109 o = urlparse(config.master.uri)
110 if o.port not in uuids.keys():
111 uuids[o.port] = roslaunch.core.generate_run_id()
112 if not config.tests:
113 rocon_launch_configuration.parent = roslaunch.parent.ROSLaunchParent(
114 uuids[o.port],
115 [launcher["path"]],
116 is_core=False,
117 port=o.port,
118 verbose=False,
119 force_screen=False,
120 is_rostest=False
121 )
122 rocon_launch_configuration.parent._load_config()
123 rocon_launch_configuration.parent.start()
124 printlog("Launch Parent - type ...............ROSLaunchParent")
125 else:
126 rocon_launch_configuration.parent = RoconTestLaunchParent(uuids[o.port], config, [launcher["path"]], port=o.port)
127 rocon_launch_configuration.parent.setUp()
128
129
130
131 _add_rocon_test_parent(rocon_launch_configuration.parent)
132 printlog("Launch Parent - type ...............ROSTestLaunchParent")
133 printlog("Launch Parent - run id..............%s" % rocon_launch_configuration.parent.run_id)
134 printlog("Launch Parent - launcher............%s" % rocon_launch_configuration.launcher["path"])
135 printlog("Launch Parent - port................%s" % o.port)
136 if not config.tests:
137 printlog("Launch Parent - tests...............no")
138 else:
139 printlog("Launch Parent - tests...............yes")
140
141
143 '''
144 Function that becomes TestCase.tearDown()
145 '''
146 for rocon_launch_configuration in self.rocon_launch_configurations:
147 config = rocon_launch_configuration.configuration
148 parent = rocon_launch_configuration.parent
149 launcher = rocon_launch_configuration.launcher
150 if _pause_mode:
151 raw_input("Press Enter to continue...")
152 set_pause_mode(False)
153 printlog("Tear Down - launcher..........................%s" % launcher["path"])
154 if config.tests:
155 printlog("Tear Down - tests..............................%s" % [test.test_name for test in config.tests])
156 if parent:
157 parent.tearDown()
158 printlog("Tear Down - run id............................%s" % parent.run_id)
159 else:
160 parent.shutdown()
161
162
163
165 def fn(self):
166 print("Duplicate tests named [%s] in rostest suite" % test_name)
167 self.fail("Duplicate tests named [%s] in rostest suite" % test_name)
168 return fn
169
170
172 def fn(self):
173 print(message, file=sys.stderr)
174 self.fail(message)
175 return fn
176
177
179 """
180 Test function generator that takes in a roslaunch Test object and
181 returns a class instance method that runs the test. TestCase
182 setUp() is responsible for ensuring that the rest of the roslaunch
183 state is correct and tearDown() is responsible for tearing
184 everything down cleanly.
185 @param test: ros test to run
186 @type test: roslaunch.Test
187 @return: function object to run testObj
188 @rtype: fn
189 """
190
191
192 def fn(self):
193 printlog("Launching tests")
194 done = False
195 while not done:
196 parent = test_launch_configuration.parent
197 self.assert_(parent is not None, "ROSTestParent initialization failed")
198 test_name = test.test_name
199 printlog(" name..............................%s" % test_name)
200
201
202
203
204 unused_succeeded, failed = parent.launch()
205 self.assert_(not failed, "Test Fixture Nodes %s failed to launch" % failed)
206
207
208
209 test_file = rosunit.xml_results_file(test_pkg, test_name, False)
210 printlog(" test results file.................%s", test_file)
211 if os.path.exists(test_file):
212 os.remove(test_file)
213 printlog(" clear old test results file.......done")
214
215
216
217
218
219 XML_OUTPUT_FLAG = '--gtest_output=xml:'
220 test.args = "%s %s%s" % (test.args, XML_OUTPUT_FLAG, test_file)
221 if _text_mode:
222 test.output = 'screen'
223 test.args = test.args + " --text"
224
225
226 printlog("Running test %s" % test_name)
227 timeout_failure = False
228 try:
229 parent.run_test(test)
230 except roslaunch.launch.RLTestTimeoutException as unused_e:
231 if test.retry:
232 timeout_failure = True
233 else:
234 raise
235
236 if not timeout_failure:
237 printlog("test finished [%s]" % test_name)
238 else:
239 printlogerr("test timed out [%s]" % test_name)
240
241 if not _text_mode or timeout_failure:
242 if not timeout_failure:
243 self.assert_(os.path.isfile(test_file), "test [%s] did not generate test results" % test_name)
244 printlog("test [%s] results are in [%s]", test_name, test_file)
245 results = rosunit.junitxml.read(test_file, test_name)
246 test_fail = results.num_errors or results.num_failures
247 else:
248 test_fail = True
249 if test.retry > 0 and test_fail:
250 test.retry -= 1
251 printlog("test [%s] failed, retrying. Retries left: %s" % (test_name, test.retry))
252 self.tearDown()
253 self.setUp()
254 else:
255 done = True
256 _accumulate_results(results)
257 printlog("test [%s] results summary: %s errors, %s failures, %s tests",
258 test_name, results.num_errors, results.num_failures, results.num_tests)
259
260
261 else:
262 if test.retry:
263 printlogerr("retry is disabled in --text mode")
264 done = True
265 printlog("test done [%s]", test_name)
266 return fn
267
268
270 '''
271 Constructs the python unit test class.
272
273 @param rocon_launcher : absolute path to the rocon test launcher
274 @param launchers : list of individual launcher configurations (name, path, port etc) in rocon_launcher
275 @return unittest.TestCase : unit test class
276 '''
277 rocon_launch_configurations = []
278 for launcher in launchers:
279 rocon_launch_configurations.append(RoconTestLaunchConfiguration(launcher))
280
281 classdict = {'setUp': setUp, 'tearDown': tearDown,
282 'rocon_launch_configurations': rocon_launch_configurations,
283 'test_file': rocon_launcher}
284
285
286 test_names = []
287 for rocon_launch_configuration in rocon_launch_configurations:
288 config = rocon_launch_configuration.configuration
289 for test in config.tests:
290
291 err_msg = None
292 try:
293 rp = rospkg.RosPack()
294 cmd = roslib.packages.find_node(test.package, test.type, rp)
295 if not cmd:
296 err_msg = "Test node [%s/%s] does not exist or is not executable" % (test.package, test.type)
297 except rospkg.ResourceNotFound:
298 err_msg = "Package [%s] for test node [%s/%s] does not exist" % (test.package, test.package, test.type)
299
300 test_name = 'test%s' % (test.test_name)
301 if err_msg:
302 classdict[test_name] = fail_runner(test.test_name, err_msg)
303 elif test_name in test_names:
304 classdict[test_name] = fail_duplicate_runner(test.test_name)
305 else:
306 classdict[test_name] = rocon_test_runner(test, rocon_launch_configuration, launcher["package"])
307 test_names.append(test_name)
308
309
310 return type('RoconTest', (unittest.TestCase,), classdict)
311