Package rostest :: Module runner

Source Code for Module rostest.runner

  1  from __future__ import with_statement 
  2  import os 
  3  import sys 
  4  import logging 
  5  import time 
  6  import unittest 
  7   
  8  import roslaunch 
  9  import roslib.packages  
 10   
 11  from rostest.rostestutil import createXMLRunner, printSummary, printRostestSummary, \ 
 12      xmlResultsFile, printlog, printlogerr 
 13  from rostest.rostest_parent import ROSTestLaunchParent 
 14  import rostest.xmlresults 
 15   
 16  _DEFAULT_TEST_PORT = 22422 
 17   
 18  # NOTE: ignoring Python style guide as unittest is sadly written with Java-like camel casing 
 19   
 20  _results = rostest.xmlresults.Result('rostest', 0, 0, 0) 
21 -def _accumulateResults(results):
22 _results.accumulate(results)
23
24 -def getResults():
25 return _results
26 27 _textMode = False
28 -def setTextMode(val):
29 global _textMode 30 _textMode = val
31 32 # global store of all ROSLaunchRunners so we can do an extra shutdown 33 # in the rare event a tearDown fails to execute 34 _test_parents = [] 35 _config = None
36 -def _addRostestParent(runner):
37 global _test_parents, _config 38 logging.getLogger('rostest').info("_addRostestParent [%s]", runner) 39 _test_parents.append(runner) 40 _config = runner.config
41
42 -def getConfig():
43 return _config
44
45 -def getRostestParents():
46 return _test_parents
47 48 # TODO: convert most of this into a run() routine of a RoslaunchRunner subclass 49 50 ## generate test failure if tests with same name in launch file
51 -def failDuplicateRunner(testName):
52 def fn(self): 53 print "Duplicate tests named [%s] in rostest suite"%testName 54 self.fail("Duplicate tests named [%s] in rostest suite"%testName)
55 return fn 56
57 -def failRunner(testName, message):
58 def fn(self): 59 print >> sys.stderr, message 60 self.fail(message)
61 return fn 62
63 -def rostestRunner(test, test_pkg):
64 """ 65 Test function generator that takes in a roslaunch Test object and 66 returns a class instance method that runs the test. TestCase 67 setUp() is responsible for ensuring that the rest of the roslaunch 68 state is correct and tearDown() is responsible for tearing 69 everything down cleanly. 70 @param test: rost test to run 71 @type test: roslaunch.Test 72 @return: function object to run testObj 73 @rtype: fn 74 """ 75 76 ## test case pass/fail is a measure of whether or not the test ran 77 def fn(self): 78 done = False 79 while not done: 80 self.assert_(self.test_parent is not None, "ROSTestParent initialization failed") 81 82 test_name = test.test_name 83 84 printlog("Running test [%s]", test_name) 85 86 #launch the other nodes 87 succeeded, failed = self.test_parent.launch() 88 self.assert_(not failed, "Test Fixture Nodes %s failed to launch"%failed) 89 90 #setup the test 91 # - we pass in the output test_file name so we can scrape it 92 test_file = xmlResultsFile(test_pkg, test_name, False) 93 if os.path.exists(test_file): 94 printlog("removing previous test results file [%s]", test_file) 95 os.remove(test_file) 96 97 # TODO: have to redeclare this due to a bug -- this file 98 # needs to be renamed as it aliases the module where the 99 # constant is elsewhere defined. The fix is to rename 100 # rostest.py 101 XML_OUTPUT_FLAG='--gtest_output=xml:' #use gtest-compatible flag 102 103 test.args = "%s %s%s"%(test.args, XML_OUTPUT_FLAG, test_file) 104 if _textMode: 105 test.output = 'screen' 106 test.args = test.args + " --text" 107 108 # run the test, blocks until completion 109 printlog("running test %s"%test_name) 110 timeout_failure = False 111 try: 112 self.test_parent.run_test(test) 113 except roslaunch.launch.RLTestTimeoutException, e: 114 if test.retry: 115 timeout_failure = True 116 else: 117 raise 118 119 if not timeout_failure: 120 printlog("test [%s] finished"%test_name) 121 else: 122 printlogerr("test [%s] timed out"%test_name) 123 124 # load in test_file 125 if not _textMode or timeout_failure: 126 127 if not timeout_failure: 128 self.assert_(os.path.isfile(test_file), "test [%s] did not generate test results"%test_name) 129 printlog("test [%s] results are in [%s]", test_name, test_file) 130 results = rostest.xmlresults.read(test_file, test_name) 131 test_fail = results.num_errors or results.num_failures 132 else: 133 test_fail = True 134 135 if test.retry > 0 and test_fail: 136 test.retry -= 1 137 printlog("test [%s] failed, retrying. Retries left: %s"%(test_name, test.retry)) 138 self.tearDown() 139 self.setUp() 140 else: 141 done = True 142 _accumulateResults(results) 143 printlog("test [%s] results summary: %s errors, %s failures, %s tests", 144 test_name, results.num_errors, results.num_failures, results.num_tests) 145 146 #self.assertEquals(0, results.num_errors, "unit test reported errors") 147 #self.assertEquals(0, results.num_failures, "unit test reported failures") 148 else: 149 if test.retry: 150 printlogerr("retry is disabled in --text mode") 151 done = True 152 printlog("[ROSTEST] test [%s] done", test_name)
153 154 return fn 155 156 ## Function that becomes TestCase.setup()
157 -def setUp(self):
158 # new test_parent for each run. we are a bit inefficient as it would be possible to 159 # reuse the roslaunch base infrastructure for each test, but the roslaunch code 160 # is not abstracted well enough yet 161 self.test_parent = ROSTestLaunchParent(self.config, [self.test_file], port=_DEFAULT_TEST_PORT) 162 163 printlog("setup[%s] run_id[%s] starting", self.test_file, self.test_parent.run_id) 164 165 self.test_parent.setUp() 166 167 # the config attribute makes it easy for tests to access the ROSLaunchConfig instance 168 self.config = self.test_parent.config 169 170 _addRostestParent(self.test_parent) 171 172 printlog("setup[%s] run_id[%s] done", self.test_file, self.test_parent.run_id)
173 174 ## Function that becomes TestCase.tearDown()
175 -def tearDown(self):
176 printlog("tearDown[%s]", self.test_file) 177 178 if self.test_parent: 179 self.test_parent.tearDown() 180 181 printlog("rostest teardown %s complete", self.test_file)
182
183 -def createUnitTest(pkg, test_file):
184 """ 185 Unit test factory. Constructs a unittest class based on the roslaunch 186 187 @param pkg: package name 188 @type pkg: str 189 @param test_file: rostest filename 190 @type test_file: str 191 """ 192 # parse the config to find the test files 193 config = roslaunch.parent.load_config_default([test_file], _DEFAULT_TEST_PORT) 194 195 # pass in config to class as a property so that test_parent can be initialized 196 classdict = { 'setUp': setUp, 'tearDown': tearDown, 'config': config, 197 'test_parent': None, 'test_file': test_file } 198 199 # add in the tests 200 testNames = [] 201 for test in config.tests: 202 # #1989: find test first to make sure it exists and is executable 203 err_msg = None 204 try: 205 cmd = roslib.packages.find_node(test.package, test.type, \ 206 test.machine.ros_root, test.machine.ros_package_path) 207 if not cmd: 208 err_msg = "Test node [%s/%s] does not exist or is not executable"%(test.package, test.type) 209 except roslib.packages.ROSPkgException, e: 210 err_msg = "Package [%s] for test node [%s/%s] does not exist"%(test.package, test.package, test.type) 211 212 testName = 'test%s'%(test.test_name) 213 if err_msg: 214 classdict[testName] = failRunner(test.test_name, err_msg) 215 elif testName in testNames: 216 classdict[testName] = failDuplicateRunner(test.test_name) 217 else: 218 classdict[testName] = rostestRunner(test, pkg) 219 testNames.append(testName) 220 221 # instantiate the TestCase instance with our magically-created tests 222 return type('RosTest',(unittest.TestCase,),classdict)
223