Package rostest :: Module runner

Source Code for Module rostest.runner

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33  from __future__ import print_function 
 34   
 35  import os 
 36  import sys 
 37  import logging 
 38  import time 
 39  import unittest 
 40   
 41  import rospkg 
 42  from rospkg.environment import ROS_TEST_RESULTS_DIR 
 43  import roslaunch 
 44  import roslib.packages  
 45   
 46  from rostest.rostestutil import createXMLRunner, printSummary, printRostestSummary, \ 
 47      xmlResultsFile, printlog, printlogerr 
 48  from rostest.rostest_parent import ROSTestLaunchParent 
 49  import rosunit.junitxml 
 50   
 51  # NOTE: ignoring Python style guide as unittest is sadly written with Java-like camel casing 
 52   
 53  _results = rosunit.junitxml.Result('rostest', 0, 0, 0) 
54 -def _accumulateResults(results):
55 _results.accumulate(results)
56
57 -def getResults():
58 return _results
59 60 _textMode = False
61 -def setTextMode(val):
62 global _textMode 63 _textMode = val
64 65 # global store of all ROSLaunchRunners so we can do an extra shutdown 66 # in the rare event a tearDown fails to execute 67 _test_parents = [] 68 _config = None
69 -def _addRostestParent(runner):
70 global _test_parents, _config 71 logging.getLogger('rostest').info("_addRostestParent [%s]", runner) 72 _test_parents.append(runner) 73 _config = runner.config
74
75 -def getConfig():
76 return _config
77
78 -def getRostestParents():
79 return _test_parents
80 81 # TODO: convert most of this into a run() routine of a RoslaunchRunner subclass 82 83 ## generate test failure if tests with same name in launch file
84 -def failDuplicateRunner(testName):
85 def fn(self): 86 print("Duplicate tests named [%s] in rostest suite"%testName) 87 self.fail("Duplicate tests named [%s] in rostest suite"%testName)
88 return fn 89
90 -def failRunner(testName, message):
91 def fn(self): 92 print(message, file=sys.stderr) 93 self.fail(message)
94 return fn 95
96 -def rostestRunner(test, test_pkg, results_base_dir=None):
97 """ 98 Test function generator that takes in a roslaunch Test object and 99 returns a class instance method that runs the test. TestCase 100 setUp() is responsible for ensuring that the rest of the roslaunch 101 state is correct and tearDown() is responsible for tearing 102 everything down cleanly. 103 @param test: rost test to run 104 @type test: roslaunch.Test 105 @return: function object to run testObj 106 @rtype: fn 107 """ 108 109 ## test case pass/fail is a measure of whether or not the test ran 110 def fn(self): 111 done = False 112 while not done: 113 self.assert_(self.test_parent is not None, "ROSTestParent initialization failed") 114 115 test_name = test.test_name 116 117 printlog("Running test [%s]", test_name) 118 119 #launch the other nodes 120 succeeded, failed = self.test_parent.launch() 121 self.assert_(not failed, "Test Fixture Nodes %s failed to launch"%failed) 122 123 #setup the test 124 # - we pass in the output test_file name so we can scrape it 125 env = None 126 if results_base_dir: 127 env = {ROS_TEST_RESULTS_DIR: results_base_dir} 128 test_file = xmlResultsFile(test_pkg, test_name, False, env=env) 129 if os.path.exists(test_file): 130 printlog("removing previous test results file [%s]", test_file) 131 os.remove(test_file) 132 133 # TODO: have to redeclare this due to a bug -- this file 134 # needs to be renamed as it aliases the module where the 135 # constant is elsewhere defined. The fix is to rename 136 # rostest.py 137 XML_OUTPUT_FLAG='--gtest_output=xml:' #use gtest-compatible flag 138 139 test.args = "%s %s%s"%(test.args, XML_OUTPUT_FLAG, test_file) 140 if _textMode: 141 test.output = 'screen' 142 test.args = test.args + " --text" 143 144 # run the test, blocks until completion 145 printlog("running test %s"%test_name) 146 timeout_failure = False 147 try: 148 self.test_parent.run_test(test) 149 except roslaunch.launch.RLTestTimeoutException as e: 150 if test.retry: 151 timeout_failure = True 152 else: 153 raise 154 155 if not timeout_failure: 156 printlog("test [%s] finished"%test_name) 157 else: 158 printlogerr("test [%s] timed out"%test_name) 159 160 # load in test_file 161 if not _textMode or timeout_failure: 162 163 if not timeout_failure: 164 self.assert_(os.path.isfile(test_file), "test [%s] did not generate test results"%test_name) 165 printlog("test [%s] results are in [%s]", test_name, test_file) 166 results = rosunit.junitxml.read(test_file, test_name) 167 test_fail = results.num_errors or results.num_failures 168 else: 169 test_fail = True 170 171 if test.retry > 0 and test_fail: 172 test.retry -= 1 173 printlog("test [%s] failed, retrying. Retries left: %s"%(test_name, test.retry)) 174 self.tearDown() 175 self.setUp() 176 else: 177 done = True 178 _accumulateResults(results) 179 printlog("test [%s] results summary: %s errors, %s failures, %s tests", 180 test_name, results.num_errors, results.num_failures, results.num_tests) 181 182 #self.assertEquals(0, results.num_errors, "unit test reported errors") 183 #self.assertEquals(0, results.num_failures, "unit test reported failures") 184 else: 185 if test.retry: 186 printlogerr("retry is disabled in --text mode") 187 done = True 188 printlog("[ROSTEST] test [%s] done", test_name)
189 190 return fn 191 192 ## Function that becomes TestCase.setup()
193 -def setUp(self):
194 # new test_parent for each run. we are a bit inefficient as it would be possible to 195 # reuse the roslaunch base infrastructure for each test, but the roslaunch code 196 # is not abstracted well enough yet 197 self.test_parent = ROSTestLaunchParent(self.config, [self.test_file], reuse_master=self.reuse_master, clear=self.clear) 198 199 printlog("setup[%s] run_id[%s] starting", self.test_file, self.test_parent.run_id) 200 201 self.test_parent.setUp() 202 203 # the config attribute makes it easy for tests to access the ROSLaunchConfig instance 204 self.config = self.test_parent.config 205 206 _addRostestParent(self.test_parent) 207 208 printlog("setup[%s] run_id[%s] done", self.test_file, self.test_parent.run_id)
209 210 ## Function that becomes TestCase.tearDown()
211 -def tearDown(self):
212 printlog("tearDown[%s]", self.test_file) 213 214 if self.test_parent: 215 self.test_parent.tearDown() 216 217 printlog("rostest teardown %s complete", self.test_file)
218
219 -def createUnitTest(pkg, test_file, reuse_master=False, clear=False, results_base_dir=None):
220 """ 221 Unit test factory. Constructs a unittest class based on the roslaunch 222 223 @param pkg: package name 224 @type pkg: str 225 @param test_file: rostest filename 226 @type test_file: str 227 """ 228 # parse the config to find the test files 229 config = roslaunch.parent.load_config_default([test_file], None) 230 231 # pass in config to class as a property so that test_parent can be initialized 232 classdict = { 'setUp': setUp, 'tearDown': tearDown, 'config': config, 233 'test_parent': None, 'test_file': test_file, 234 'reuse_master': reuse_master, 'clear': clear } 235 236 # add in the tests 237 testNames = [] 238 for test in config.tests: 239 # #1989: find test first to make sure it exists and is executable 240 err_msg = None 241 try: 242 rp = rospkg.RosPack() 243 cmd = roslib.packages.find_node(test.package, test.type, rp) 244 if not cmd: 245 err_msg = "Test node [%s/%s] does not exist or is not executable"%(test.package, test.type) 246 except rospkg.ResourceNotFound as e: 247 err_msg = "Package [%s] for test node [%s/%s] does not exist"%(test.package, test.package, test.type) 248 249 testName = 'test%s'%(test.test_name) 250 if err_msg: 251 classdict[testName] = failRunner(test.test_name, err_msg) 252 elif testName in testNames: 253 classdict[testName] = failDuplicateRunner(test.test_name) 254 else: 255 classdict[testName] = rostestRunner(test, pkg, results_base_dir=results_base_dir) 256 testNames.append(testName) 257 258 # instantiate the TestCase instance with our magically-created tests 259 return type('RosTest',(unittest.TestCase,),classdict)
260