00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 from __future__ import print_function
00012
00013 import os
00014 import sys
00015 import unittest
00016 from urlparse import urlparse
00017 import rospkg
00018 import roslib
00019 import roslaunch
00020 import rosunit
00021
00022
00023 from loggers import printlog, printlogerr
00024 from test_parent import RoconTestLaunchParent
00025
00026
00027
00028
00029
00030 _DEFAULT_TEST_PORT = 22422
00031 _results = rosunit.junitxml.Result('rocon_test', 0, 0, 0)
00032 _text_mode = False
00033 _pause_mode = False
00034
00035
00036 def get_results():
00037 return _results
00038
00039
00040 def _accumulate_results(results):
00041 _results.accumulate(results)
00042
00043
00044 def set_text_mode(val):
00045 global _text_mode
00046 _text_mode = val
00047
00048
00049 def set_pause_mode(val):
00050 global _pause_mode
00051 _pause_mode = val
00052
00053
00054
00055
00056
00057
00058
00059 _test_parents = []
00060
00061 _config = None
00062
00063
00064 def _add_rocon_test_parent(runner):
00065 global _test_parents, _config
00066 _test_parents.append(runner)
00067 _config = runner.config
00068
00069
00070 def get_rocon_test_parents():
00071 return _test_parents
00072
00073
00074
00075
00076
00077
00078 class RoconTestLaunchConfiguration(object):
00079 '''
00080 Provides configuration details for a rocon test launch configuration
00081 associated with a single master.
00082
00083 :param launcher: a roslaunch test configuration
00084 :type launcher: :class:`rocon_launch.RosLaunchConfiguration`
00085 '''
00086 def __init__(self, launcher):
00087
00088 self.launcher = launcher
00089
00090 self.configuration = roslaunch.parent.load_config_default([launcher.path], launcher.port)
00091 self.test_parent = None
00092
00093
00094 def setUp(self):
00095 '''
00096 Function that becomes TestCase.setUp()
00097
00098 For each launcher representing a launch on a single master in the rocon
00099 launcher, this makes sure there is an entity that is to be the 'parent'
00100 who will later be responsible for shutting everything down.
00101
00102 This could be prone to problems if someone puts multiple test launchers in
00103 rocon launcher with the same master port (untested).
00104 '''
00105
00106
00107
00108 uuids = {}
00109 for rocon_launch_configuration in self.rocon_launch_configurations:
00110 config = rocon_launch_configuration.configuration
00111 launcher = rocon_launch_configuration.launcher
00112 o = urlparse(config.master.uri)
00113 if o.port not in uuids.keys():
00114 uuids[o.port] = roslaunch.core.generate_run_id()
00115 if not config.tests:
00116 rocon_launch_configuration.parent = roslaunch.parent.ROSLaunchParent(
00117 uuids[o.port],
00118 [launcher.path],
00119 is_core=False,
00120 port=o.port,
00121 verbose=False,
00122 force_screen=False,
00123 is_rostest=False
00124 )
00125 rocon_launch_configuration.parent._load_config()
00126 rocon_launch_configuration.parent.start()
00127 printlog("Launch Parent - type ...............ROSLaunchParent")
00128 else:
00129 rocon_launch_configuration.parent = RoconTestLaunchParent(uuids[o.port], config, [launcher.path], port=o.port)
00130 rocon_launch_configuration.parent.setUp()
00131
00132
00133
00134 _add_rocon_test_parent(rocon_launch_configuration.parent)
00135 printlog("Launch Parent - type ...............ROSTestLaunchParent")
00136 printlog("Launch Parent - run id..............%s" % rocon_launch_configuration.parent.run_id)
00137 printlog("Launch Parent - launcher............%s" % rocon_launch_configuration.launcher.path)
00138 printlog("Launch Parent - port................%s" % o.port)
00139 if not config.tests:
00140 printlog("Launch Parent - tests...............no")
00141 else:
00142 printlog("Launch Parent - tests...............yes")
00143
00144
00145 def tearDown(self):
00146 '''
00147 Function that becomes TestCase.tearDown()
00148 '''
00149 for rocon_launch_configuration in self.rocon_launch_configurations:
00150 config = rocon_launch_configuration.configuration
00151 parent = rocon_launch_configuration.parent
00152 launcher = rocon_launch_configuration.launcher
00153 if _pause_mode:
00154 raw_input("Press Enter to continue...")
00155 set_pause_mode(False)
00156 printlog("Tear Down - launcher..........................%s" % launcher.path)
00157 if config.tests:
00158 printlog("Tear Down - tests..............................%s" % [test.test_name for test in config.tests])
00159 if parent:
00160 parent.tearDown()
00161 printlog("Tear Down - run id............................%s" % parent.run_id)
00162 else:
00163 parent.shutdown()
00164
00165
00166
00167 def fail_duplicate_runner(test_name):
00168 def fn(self):
00169 print("Duplicate tests named [%s] in rostest suite" % test_name)
00170 self.fail("Duplicate tests named [%s] in rostest suite" % test_name)
00171 return fn
00172
00173
00174 def fail_runner(test_name, message):
00175 def fn(self):
00176 print(message, file=sys.stderr)
00177 self.fail(message)
00178 return fn
00179
00180
00181 def rocon_test_runner(test, test_launch_configuration, test_pkg):
00182 """
00183 Test function generator that takes in a roslaunch Test object and
00184 returns a class instance method that runs the test. TestCase
00185 setUp() is responsible for ensuring that the rest of the roslaunch
00186 state is correct and tearDown() is responsible for tearing
00187 everything down cleanly.
00188
00189 :param test: ros test to run
00190 :type test: :class:`.RoconTestLaunchConfiguration`
00191 :returns: function object to run testObj
00192 :rtype: fn
00193 """
00194
00195
00196 def fn(self):
00197 printlog("Launching tests")
00198 done = False
00199 while not done:
00200 parent = test_launch_configuration.parent
00201 self.assert_(parent is not None, "ROSTestParent initialization failed")
00202 test_name = test.test_name
00203 printlog(" name..............................%s" % test_name)
00204
00205
00206
00207
00208 unused_succeeded, failed = parent.launch()
00209 self.assert_(not failed, "Test Fixture Nodes %s failed to launch" % failed)
00210
00211
00212
00213 test_file = rosunit.xml_results_file(test_pkg, test_name, False)
00214 printlog(" test results file.................%s", test_file)
00215 if os.path.exists(test_file):
00216 os.remove(test_file)
00217 printlog(" clear old test results file.......done")
00218
00219
00220
00221
00222
00223 XML_OUTPUT_FLAG = '--gtest_output=xml:'
00224 test.args = "%s %s%s" % (test.args, XML_OUTPUT_FLAG, test_file)
00225 if _text_mode:
00226 test.output = 'screen'
00227 test.args = test.args + " --text"
00228
00229
00230 printlog("Running test %s" % test_name)
00231 timeout_failure = False
00232 try:
00233 parent.run_test(test)
00234 except roslaunch.launch.RLTestTimeoutException as unused_e:
00235 if test.retry:
00236 timeout_failure = True
00237 else:
00238 raise
00239
00240 if not timeout_failure:
00241 printlog("test finished [%s]" % test_name)
00242 else:
00243 printlogerr("test timed out [%s]" % test_name)
00244
00245 if not _text_mode or timeout_failure:
00246 if not timeout_failure:
00247 self.assert_(os.path.isfile(test_file), "test [%s] did not generate test results" % test_name)
00248 printlog("test [%s] results are in [%s]", test_name, test_file)
00249 results = rosunit.junitxml.read(test_file, test_name)
00250 test_fail = results.num_errors or results.num_failures
00251 else:
00252 test_fail = True
00253 if test.retry > 0 and test_fail:
00254 test.retry -= 1
00255 printlog("test [%s] failed, retrying. Retries left: %s" % (test_name, test.retry))
00256 self.tearDown()
00257 self.setUp()
00258 else:
00259 done = True
00260 _accumulate_results(results)
00261 printlog("test [%s] results summary: %s errors, %s failures, %s tests",
00262 test_name, results.num_errors, results.num_failures, results.num_tests)
00263
00264
00265 else:
00266 if test.retry:
00267 printlogerr("retry is disabled in --text mode")
00268 done = True
00269 printlog("test done [%s]", test_name)
00270 return fn
00271
00272
00273 def create_unit_rocon_test(rocon_launcher, launchers):
00274 '''
00275 Constructs the python unit test class.
00276
00277 @param rocon_launcher : absolute path to the rocon test launcher
00278 @param launchers : list of individual launcher configurations (name, path, port etc) in rocon_launcher
00279 @return unittest.TestCase : unit test class
00280 '''
00281 rocon_launch_configurations = []
00282 for launcher in launchers:
00283 rocon_launch_configurations.append(RoconTestLaunchConfiguration(launcher))
00284
00285 classdict = {'setUp': setUp, 'tearDown': tearDown,
00286 'rocon_launch_configurations': rocon_launch_configurations,
00287 'test_file': rocon_launcher}
00288
00289
00290 test_names = []
00291 for rocon_launch_configuration in rocon_launch_configurations:
00292 config = rocon_launch_configuration.configuration
00293 for test in config.tests:
00294
00295 err_msg = None
00296 try:
00297 rp = rospkg.RosPack()
00298 cmd = roslib.packages.find_node(test.package, test.type, rp)
00299 if not cmd:
00300 err_msg = "Test node [%s/%s] does not exist or is not executable" % (test.package, test.type)
00301 except rospkg.ResourceNotFound:
00302 err_msg = "Package [%s] for test node [%s/%s] does not exist" % (test.package, test.package, test.type)
00303
00304 test_name = 'test%s' % (test.test_name)
00305 if err_msg:
00306 classdict[test_name] = fail_runner(test.test_name, err_msg)
00307 elif test_name in test_names:
00308 classdict[test_name] = fail_duplicate_runner(test.test_name)
00309 else:
00310 classdict[test_name] = rocon_test_runner(test, rocon_launch_configuration, launcher.package)
00311 test_names.append(test_name)
00312
00313
00314 return type('RoconTest', (unittest.TestCase,), classdict)