Package rocon_test :: Module runner
[frames] | no frames]

Source Code for Module rocon_test.runner

  1  #!/usr/bin/env python 
  2  # 
  3  # License: BSD 
  4  #   https://raw.github.com/robotics-in-concert/rocon_multimaster/hydro-devel/rocon_test/LICENSE 
  5  # 
  6   
  7  ############################################################################## 
  8  # Imports 
  9  ############################################################################## 
 10   
 11  from __future__ import print_function 
 12   
 13  import os 
 14  import sys 
 15  import unittest 
 16  from urlparse import urlparse 
 17  import rospkg 
 18  import roslib 
 19  import roslaunch 
 20  import rosunit 
 21   
 22  # Local imports 
 23  from loggers import printlog, printlogerr 
 24  from test_parent import RoconTestLaunchParent 
 25   
 26  ############################################################################## 
 27  # Globals 
 28  ############################################################################## 
 29   
 30  _DEFAULT_TEST_PORT = 22422 
 31  _results = rosunit.junitxml.Result('rocon_test', 0, 0, 0) 
 32  _text_mode = False 
 33  _pause_mode = False 
 34   
 35   
36 -def get_results():
37 return _results
38 39
40 -def _accumulate_results(results):
41 _results.accumulate(results)
42 43
44 -def set_text_mode(val):
45 global _text_mode 46 _text_mode = val
47 48
49 -def set_pause_mode(val):
50 global _pause_mode 51 _pause_mode = val
52 53 ############################################################################## 54 # Parents 55 ############################################################################## 56 57 # global store of all ROSLaunchRunners so we can do an extra shutdown 58 # in the rare event a tearDown fails to execute 59 _test_parents = [] 60 # Is _config actually used anywhere? 61 _config = None 62 63
64 -def _add_rocon_test_parent(runner):
65 global _test_parents, _config 66 _test_parents.append(runner) 67 _config = runner.config
68 69
70 -def get_rocon_test_parents():
71 return _test_parents
72 73 74 ############################################################################## 75 # Construction Methods 76 ############################################################################## 77
78 -class RoconTestLaunchConfiguration(object):
79 ''' 80 Provides configuration details for a rocon test launch configuration 81 associated with a single master. 82 '''
83 - def __init__(self, launcher):
84 # Launcher configuration (name, port, path to file, etch) 85 self.launcher = launcher 86 # ros launch configuration info 87 self.configuration = roslaunch.parent.load_config_default([launcher['path']], launcher['port']) 88 self.test_parent = None # ros test launcher parent, handles start, stop, shutdown
89 90
91 -def setUp(self):
92 ''' 93 Function that becomes TestCase.setUp() 94 95 For each launcher representing a launch on a single master in the rocon 96 launcher, this makes sure there is an entity that is to be the 'parent' 97 who will later be responsible for shutting everything down. 98 99 This could be prone to problems if someone puts multiple launchers in 100 rocon launcher with the same master port (untested). 101 ''' 102 # new parent for each run. we are a bit inefficient as it would be possible to 103 # reuse the roslaunch base infrastructure for each test, but the roslaunch code 104 # is not abstracted well enough yet 105 uuids = {} 106 for rocon_launch_configuration in self.rocon_launch_configurations: 107 config = rocon_launch_configuration.configuration 108 launcher = rocon_launch_configuration.launcher 109 o = urlparse(config.master.uri) 110 if o.port not in uuids.keys(): 111 uuids[o.port] = roslaunch.core.generate_run_id() 112 if not config.tests: 113 rocon_launch_configuration.parent = roslaunch.parent.ROSLaunchParent( 114 uuids[o.port], 115 [launcher["path"]], 116 is_core=False, 117 port=o.port, 118 verbose=False, 119 force_screen=False, 120 is_rostest=False 121 ) 122 rocon_launch_configuration.parent._load_config() 123 rocon_launch_configuration.parent.start() 124 printlog("Launch Parent - type ...............ROSLaunchParent") 125 else: 126 rocon_launch_configuration.parent = RoconTestLaunchParent(uuids[o.port], config, [launcher["path"]], port=o.port) 127 rocon_launch_configuration.parent.setUp() 128 # the config attribute makes it easy for tests to access the ROSLaunchConfig instance 129 # Should we do this - it doesn't make a whole lot of sense? 130 #rocon_launch_configuration.configuration = rocon_launch_configuration.parent.config 131 _add_rocon_test_parent(rocon_launch_configuration.parent) 132 printlog("Launch Parent - type ...............ROSTestLaunchParent") 133 printlog("Launch Parent - run id..............%s" % rocon_launch_configuration.parent.run_id) 134 printlog("Launch Parent - launcher............%s" % rocon_launch_configuration.launcher["path"]) 135 printlog("Launch Parent - port................%s" % o.port) 136 if not config.tests: 137 printlog("Launch Parent - tests...............no") 138 else: 139 printlog("Launch Parent - tests...............yes")
140 141
142 -def tearDown(self):
143 ''' 144 Function that becomes TestCase.tearDown() 145 ''' 146 for rocon_launch_configuration in self.rocon_launch_configurations: 147 config = rocon_launch_configuration.configuration 148 parent = rocon_launch_configuration.parent 149 launcher = rocon_launch_configuration.launcher 150 if _pause_mode: 151 raw_input("Press Enter to continue...") 152 set_pause_mode(False) # only trigger pause once 153 printlog("Tear Down - launcher..........................%s" % launcher["path"]) 154 if config.tests: 155 printlog("Tear Down - tests..............................%s" % [test.test_name for test in config.tests]) 156 if parent: 157 parent.tearDown() 158 printlog("Tear Down - run id............................%s" % parent.run_id) 159 else: 160 parent.shutdown()
161 162 163 ## generate test failure if tests with same name in launch file
164 -def fail_duplicate_runner(test_name):
165 def fn(self): 166 print("Duplicate tests named [%s] in rostest suite" % test_name) 167 self.fail("Duplicate tests named [%s] in rostest suite" % test_name)
168 return fn 169 170
171 -def fail_runner(test_name, message):
172 def fn(self): 173 print(message, file=sys.stderr) 174 self.fail(message)
175 return fn 176 177
178 -def rocon_test_runner(test, test_launch_configuration, test_pkg):
179 """ 180 Test function generator that takes in a roslaunch Test object and 181 returns a class instance method that runs the test. TestCase 182 setUp() is responsible for ensuring that the rest of the roslaunch 183 state is correct and tearDown() is responsible for tearing 184 everything down cleanly. 185 @param test: ros test to run 186 @type test: roslaunch.Test 187 @return: function object to run testObj 188 @rtype: fn 189 """ 190 191 ## test case pass/fail is a measure of whether or not the test ran 192 def fn(self): 193 printlog("Launching tests") 194 done = False 195 while not done: 196 parent = test_launch_configuration.parent 197 self.assert_(parent is not None, "ROSTestParent initialization failed") 198 test_name = test.test_name 199 printlog(" name..............................%s" % test_name) 200 201 #launch the other nodes 202 #for parent in self.parents: 203 # DJS - will this mean I miss starting up othe test parents? 204 unused_succeeded, failed = parent.launch() 205 self.assert_(not failed, "Test Fixture Nodes %s failed to launch" % failed) 206 207 #setup the test 208 # - we pass in the output test_file name so we can scrape it 209 test_file = rosunit.xml_results_file(test_pkg, test_name, False) 210 printlog(" test results file.................%s", test_file) 211 if os.path.exists(test_file): 212 os.remove(test_file) 213 printlog(" clear old test results file.......done") 214 215 # TODO: have to redeclare this due to a bug -- this file 216 # needs to be renamed as it aliases the module where the 217 # constant is elsewhere defined. The fix is to rename 218 # rostest.py 219 XML_OUTPUT_FLAG = '--gtest_output=xml:' # use gtest-compatible flag 220 test.args = "%s %s%s" % (test.args, XML_OUTPUT_FLAG, test_file) 221 if _text_mode: 222 test.output = 'screen' 223 test.args = test.args + " --text" 224 225 # run the test, blocks until completion 226 printlog("Running test %s" % test_name) 227 timeout_failure = False 228 try: 229 parent.run_test(test) 230 except roslaunch.launch.RLTestTimeoutException as unused_e: 231 if test.retry: 232 timeout_failure = True 233 else: 234 raise 235 236 if not timeout_failure: 237 printlog("test finished [%s]" % test_name) 238 else: 239 printlogerr("test timed out [%s]" % test_name) 240 241 if not _text_mode or timeout_failure: 242 if not timeout_failure: 243 self.assert_(os.path.isfile(test_file), "test [%s] did not generate test results" % test_name) 244 printlog("test [%s] results are in [%s]", test_name, test_file) 245 results = rosunit.junitxml.read(test_file, test_name) 246 test_fail = results.num_errors or results.num_failures 247 else: 248 test_fail = True 249 if test.retry > 0 and test_fail: 250 test.retry -= 1 251 printlog("test [%s] failed, retrying. Retries left: %s" % (test_name, test.retry)) 252 self.tearDown() 253 self.setUp() 254 else: 255 done = True 256 _accumulate_results(results) 257 printlog("test [%s] results summary: %s errors, %s failures, %s tests", 258 test_name, results.num_errors, results.num_failures, results.num_tests) 259 #self.assertEquals(0, results.num_errors, "unit test reported errors") 260 #self.assertEquals(0, results.num_failures, "unit test reported failures") 261 else: 262 if test.retry: 263 printlogerr("retry is disabled in --text mode") 264 done = True 265 printlog("test done [%s]", test_name)
266 return fn 267 268
269 -def create_unit_rocon_test(rocon_launcher, launchers):
270 ''' 271 Constructs the python unit test class. 272 273 @param rocon_launcher : absolute path to the rocon test launcher 274 @param launchers : list of individual launcher configurations (name, path, port etc) in rocon_launcher 275 @return unittest.TestCase : unit test class 276 ''' 277 rocon_launch_configurations = [] 278 for launcher in launchers: 279 rocon_launch_configurations.append(RoconTestLaunchConfiguration(launcher)) 280 # pass in config to class as a property so that parent can be initialized 281 classdict = {'setUp': setUp, 'tearDown': tearDown, 282 'rocon_launch_configurations': rocon_launch_configurations, 283 'test_file': rocon_launcher} 284 285 # add in the tests 286 test_names = [] 287 for rocon_launch_configuration in rocon_launch_configurations: 288 config = rocon_launch_configuration.configuration 289 for test in config.tests: 290 # #1989: find test first to make sure it exists and is executable 291 err_msg = None 292 try: 293 rp = rospkg.RosPack() 294 cmd = roslib.packages.find_node(test.package, test.type, rp) 295 if not cmd: 296 err_msg = "Test node [%s/%s] does not exist or is not executable" % (test.package, test.type) 297 except rospkg.ResourceNotFound: 298 err_msg = "Package [%s] for test node [%s/%s] does not exist" % (test.package, test.package, test.type) 299 300 test_name = 'test%s' % (test.test_name) 301 if err_msg: 302 classdict[test_name] = fail_runner(test.test_name, err_msg) 303 elif test_name in test_names: 304 classdict[test_name] = fail_duplicate_runner(test.test_name) 305 else: 306 classdict[test_name] = rocon_test_runner(test, rocon_launch_configuration, launcher["package"]) 307 test_names.append(test_name) 308 309 # instantiate the TestCase instance with our magically-created tests 310 return type('RoconTest', (unittest.TestCase,), classdict)
311