Package rostest :: Module runner

Source Code for Module rostest.runner

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33  from __future__ import print_function 
 34   
 35  import os 
 36  import sys 
 37  import logging 
 38  import time 
 39  import unittest 
 40   
 41  import rospkg 
 42  import roslaunch 
 43  import roslib.packages  
 44   
 45  from rostest.rostestutil import createXMLRunner, printSummary, printRostestSummary, \ 
 46      xmlResultsFile, printlog, printlogerr 
 47  from rostest.rostest_parent import ROSTestLaunchParent 
 48  import rosunit.junitxml 
 49   
 50  _DEFAULT_TEST_PORT = 22422 
 51   
 52  # NOTE: ignoring Python style guide as unittest is sadly written with Java-like camel casing 
 53   
 54  _results = rosunit.junitxml.Result('rostest', 0, 0, 0) 
55 -def _accumulateResults(results):
56 _results.accumulate(results)
57
58 -def getResults():
59 return _results
60 61 _textMode = False
62 -def setTextMode(val):
63 global _textMode 64 _textMode = val
65 66 # global store of all ROSLaunchRunners so we can do an extra shutdown 67 # in the rare event a tearDown fails to execute 68 _test_parents = [] 69 _config = None
70 -def _addRostestParent(runner):
71 global _test_parents, _config 72 logging.getLogger('rostest').info("_addRostestParent [%s]", runner) 73 _test_parents.append(runner) 74 _config = runner.config
75
76 -def getConfig():
77 return _config
78
79 -def getRostestParents():
80 return _test_parents
81 82 # TODO: convert most of this into a run() routine of a RoslaunchRunner subclass 83 84 ## generate test failure if tests with same name in launch file
85 -def failDuplicateRunner(testName):
86 def fn(self): 87 print("Duplicate tests named [%s] in rostest suite"%testName) 88 self.fail("Duplicate tests named [%s] in rostest suite"%testName)
89 return fn 90
91 -def failRunner(testName, message):
92 def fn(self): 93 print(message, file=sys.stderr) 94 self.fail(message)
95 return fn 96
97 -def rostestRunner(test, test_pkg):
98 """ 99 Test function generator that takes in a roslaunch Test object and 100 returns a class instance method that runs the test. TestCase 101 setUp() is responsible for ensuring that the rest of the roslaunch 102 state is correct and tearDown() is responsible for tearing 103 everything down cleanly. 104 @param test: rost test to run 105 @type test: roslaunch.Test 106 @return: function object to run testObj 107 @rtype: fn 108 """ 109 110 ## test case pass/fail is a measure of whether or not the test ran 111 def fn(self): 112 done = False 113 while not done: 114 self.assert_(self.test_parent is not None, "ROSTestParent initialization failed") 115 116 test_name = test.test_name 117 118 printlog("Running test [%s]", test_name) 119 120 #launch the other nodes 121 succeeded, failed = self.test_parent.launch() 122 self.assert_(not failed, "Test Fixture Nodes %s failed to launch"%failed) 123 124 #setup the test 125 # - we pass in the output test_file name so we can scrape it 126 test_file = xmlResultsFile(test_pkg, test_name, False) 127 if os.path.exists(test_file): 128 printlog("removing previous test results file [%s]", test_file) 129 os.remove(test_file) 130 131 # TODO: have to redeclare this due to a bug -- this file 132 # needs to be renamed as it aliases the module where the 133 # constant is elsewhere defined. The fix is to rename 134 # rostest.py 135 XML_OUTPUT_FLAG='--gtest_output=xml:' #use gtest-compatible flag 136 137 test.args = "%s %s%s"%(test.args, XML_OUTPUT_FLAG, test_file) 138 if _textMode: 139 test.output = 'screen' 140 test.args = test.args + " --text" 141 142 # run the test, blocks until completion 143 printlog("running test %s"%test_name) 144 timeout_failure = False 145 try: 146 self.test_parent.run_test(test) 147 except roslaunch.launch.RLTestTimeoutException as e: 148 if test.retry: 149 timeout_failure = True 150 else: 151 raise 152 153 if not timeout_failure: 154 printlog("test [%s] finished"%test_name) 155 else: 156 printlogerr("test [%s] timed out"%test_name) 157 158 # load in test_file 159 if not _textMode or timeout_failure: 160 161 if not timeout_failure: 162 self.assert_(os.path.isfile(test_file), "test [%s] did not generate test results"%test_name) 163 printlog("test [%s] results are in [%s]", test_name, test_file) 164 results = rosunit.junitxml.read(test_file, test_name) 165 test_fail = results.num_errors or results.num_failures 166 else: 167 test_fail = True 168 169 if test.retry > 0 and test_fail: 170 test.retry -= 1 171 printlog("test [%s] failed, retrying. Retries left: %s"%(test_name, test.retry)) 172 self.tearDown() 173 self.setUp() 174 else: 175 done = True 176 _accumulateResults(results) 177 printlog("test [%s] results summary: %s errors, %s failures, %s tests", 178 test_name, results.num_errors, results.num_failures, results.num_tests) 179 180 #self.assertEquals(0, results.num_errors, "unit test reported errors") 181 #self.assertEquals(0, results.num_failures, "unit test reported failures") 182 else: 183 if test.retry: 184 printlogerr("retry is disabled in --text mode") 185 done = True 186 printlog("[ROSTEST] test [%s] done", test_name)
187 188 return fn 189 190 ## Function that becomes TestCase.setup()
191 -def setUp(self):
192 # new test_parent for each run. we are a bit inefficient as it would be possible to 193 # reuse the roslaunch base infrastructure for each test, but the roslaunch code 194 # is not abstracted well enough yet 195 self.test_parent = ROSTestLaunchParent(self.config, [self.test_file], port=_DEFAULT_TEST_PORT) 196 197 printlog("setup[%s] run_id[%s] starting", self.test_file, self.test_parent.run_id) 198 199 self.test_parent.setUp() 200 201 # the config attribute makes it easy for tests to access the ROSLaunchConfig instance 202 self.config = self.test_parent.config 203 204 _addRostestParent(self.test_parent) 205 206 printlog("setup[%s] run_id[%s] done", self.test_file, self.test_parent.run_id)
207 208 ## Function that becomes TestCase.tearDown()
209 -def tearDown(self):
210 printlog("tearDown[%s]", self.test_file) 211 212 if self.test_parent: 213 self.test_parent.tearDown() 214 215 printlog("rostest teardown %s complete", self.test_file)
216
217 -def createUnitTest(pkg, test_file):
218 """ 219 Unit test factory. Constructs a unittest class based on the roslaunch 220 221 @param pkg: package name 222 @type pkg: str 223 @param test_file: rostest filename 224 @type test_file: str 225 """ 226 # parse the config to find the test files 227 config = roslaunch.parent.load_config_default([test_file], _DEFAULT_TEST_PORT) 228 229 # pass in config to class as a property so that test_parent can be initialized 230 classdict = { 'setUp': setUp, 'tearDown': tearDown, 'config': config, 231 'test_parent': None, 'test_file': test_file } 232 233 # add in the tests 234 testNames = [] 235 for test in config.tests: 236 # #1989: find test first to make sure it exists and is executable 237 err_msg = None 238 try: 239 rp = rospkg.RosPack() 240 cmd = roslib.packages.find_node(test.package, test.type, rp) 241 if not cmd: 242 err_msg = "Test node [%s/%s] does not exist or is not executable"%(test.package, test.type) 243 except rospkg.ResourceNotFound as e: 244 err_msg = "Package [%s] for test node [%s/%s] does not exist"%(test.package, test.package, test.type) 245 246 testName = 'test%s'%(test.test_name) 247 if err_msg: 248 classdict[testName] = failRunner(test.test_name, err_msg) 249 elif testName in testNames: 250 classdict[testName] = failDuplicateRunner(test.test_name) 251 else: 252 classdict[testName] = rostestRunner(test, pkg) 253 testNames.append(testName) 254 255 # instantiate the TestCase instance with our magically-created tests 256 return type('RosTest',(unittest.TestCase,),classdict)
257