runner.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # License: BSD
00004 #   https://raw.github.com/robotics-in-concert/rocon_multimaster/license/LICENSE
00005 #
00006 
00007 ##############################################################################
00008 # Imports
00009 ##############################################################################
00010 
00011 from __future__ import print_function
00012 
00013 import os
00014 import sys
00015 import unittest
00016 from urlparse import urlparse
00017 import rospkg
00018 import roslib
00019 import roslaunch
00020 import rosunit
00021 
00022 # Local imports
00023 from loggers import printlog, printlogerr
00024 from test_parent import RoconTestLaunchParent
00025 
00026 ##############################################################################
00027 # Globals
00028 ##############################################################################
00029 
00030 _DEFAULT_TEST_PORT = 22422
00031 _results = rosunit.junitxml.Result('rocon_test', 0, 0, 0)
00032 _text_mode = False
00033 _pause_mode = False
00034 
00035 
00036 def get_results():
00037     return _results
00038 
00039 
00040 def _accumulate_results(results):
00041     _results.accumulate(results)
00042 
00043 
00044 def set_text_mode(val):
00045     global _text_mode
00046     _text_mode = val
00047 
00048 
00049 def set_pause_mode(val):
00050     global _pause_mode
00051     _pause_mode = val
00052 
00053 ##############################################################################
00054 # Parents
00055 ##############################################################################
00056 
00057 # global store of all ROSLaunchRunners so we can do an extra shutdown
00058 # in the rare event a tearDown fails to execute
00059 _test_parents = []
00060 # Is _config actually used anywhere?
00061 _config = None
00062 
00063 
00064 def _add_rocon_test_parent(runner):
00065     global _test_parents, _config
00066     _test_parents.append(runner)
00067     _config = runner.config
00068 
00069 
00070 def get_rocon_test_parents():
00071     return _test_parents
00072 
00073 
00074 ##############################################################################
00075 # Construction Methods
00076 ##############################################################################
00077 
00078 class RoconTestLaunchConfiguration(object):
00079     '''
00080       Provides configuration details for a rocon test launch configuration
00081       associated with a single master.
00082 
00083       :param launcher: a roslaunch test configuration
00084       :type launcher: :class:`rocon_launch.RosLaunchConfiguration`
00085     '''
00086     def __init__(self, launcher):
00087         # Launcher configuration (name, port, path to file, etch)
00088         self.launcher = launcher
00089         # ros launch configuration info
00090         self.configuration = roslaunch.parent.load_config_default([launcher.path], launcher.port)
00091         self.test_parent = None  # ros test launcher parent, handles start, stop, shutdown
00092 
00093 
00094 def setUp(self):
00095     '''
00096       Function that becomes TestCase.setUp()
00097 
00098       For each launcher representing a launch on a single master in the rocon
00099       launcher, this makes sure there is an entity that is to be the 'parent'
00100       who will later be responsible for shutting everything down.
00101 
00102       This could be prone to problems if someone puts multiple test launchers in
00103       rocon launcher with the same master port (untested).
00104     '''
00105     # new parent for each run. we are a bit inefficient as it would be possible to
00106     # reuse the roslaunch base infrastructure for each test, but the roslaunch code
00107     # is not abstracted well enough yet
00108     uuids = {}
00109     for rocon_launch_configuration in self.rocon_launch_configurations:
00110         config = rocon_launch_configuration.configuration
00111         launcher = rocon_launch_configuration.launcher
00112         o = urlparse(config.master.uri)
00113         if o.port not in uuids.keys():
00114             uuids[o.port] = roslaunch.core.generate_run_id()
00115         if not config.tests:
00116             rocon_launch_configuration.parent = roslaunch.parent.ROSLaunchParent(
00117                                                             uuids[o.port],
00118                                                             [launcher.path],
00119                                                             is_core=False,
00120                                                             port=o.port,
00121                                                             verbose=False,
00122                                                             force_screen=False,
00123                                                             is_rostest=False
00124                                                             )
00125             rocon_launch_configuration.parent._load_config()
00126             rocon_launch_configuration.parent.start()
00127             printlog("Launch Parent - type ...............ROSLaunchParent")
00128         else:
00129             rocon_launch_configuration.parent = RoconTestLaunchParent(uuids[o.port], config, [launcher.path], port=o.port)
00130             rocon_launch_configuration.parent.setUp()
00131             # the config attribute makes it easy for tests to access the ROSLaunchConfig instance
00132             # Should we do this - it doesn't make a whole lot of sense?
00133             #rocon_launch_configuration.configuration = rocon_launch_configuration.parent.config
00134             _add_rocon_test_parent(rocon_launch_configuration.parent)
00135             printlog("Launch Parent - type ...............ROSTestLaunchParent")
00136         printlog("Launch Parent - run id..............%s" % rocon_launch_configuration.parent.run_id)
00137         printlog("Launch Parent - launcher............%s" % rocon_launch_configuration.launcher.path)
00138         printlog("Launch Parent - port................%s" % o.port)
00139         if not config.tests:
00140             printlog("Launch Parent - tests...............no")
00141         else:
00142             printlog("Launch Parent - tests...............yes")
00143 
00144 
00145 def tearDown(self):
00146     '''
00147       Function that becomes TestCase.tearDown()
00148     '''
00149     for rocon_launch_configuration in self.rocon_launch_configurations:
00150         config = rocon_launch_configuration.configuration
00151         parent = rocon_launch_configuration.parent
00152         launcher = rocon_launch_configuration.launcher
00153         if _pause_mode:
00154             raw_input("Press Enter to continue...")
00155             set_pause_mode(False)  # only trigger pause once
00156         printlog("Tear Down - launcher..........................%s" % launcher.path)
00157         if config.tests:
00158             printlog("Tear Down - tests..............................%s" % [test.test_name for test in config.tests])
00159             if parent:
00160                 parent.tearDown()
00161                 printlog("Tear Down - run id............................%s" % parent.run_id)
00162         else:
00163             parent.shutdown()
00164 
00165 
00166 ## generate test failure if tests with same name in launch file
00167 def fail_duplicate_runner(test_name):
00168     def fn(self):
00169         print("Duplicate tests named [%s] in rostest suite" % test_name)
00170         self.fail("Duplicate tests named [%s] in rostest suite" % test_name)
00171     return fn
00172 
00173 
00174 def fail_runner(test_name, message):
00175     def fn(self):
00176         print(message, file=sys.stderr)
00177         self.fail(message)
00178     return fn
00179 
00180 
00181 def rocon_test_runner(test, test_launch_configuration, test_pkg):
00182     """
00183     Test function generator that takes in a roslaunch Test object and
00184     returns a class instance method that runs the test. TestCase
00185     setUp() is responsible for ensuring that the rest of the roslaunch
00186     state is correct and tearDown() is responsible for tearing
00187     everything down cleanly.
00188 
00189     :param test: ros test to run
00190     :type test: :class:`.RoconTestLaunchConfiguration`
00191     :returns: function object to run testObj
00192     :rtype: fn
00193     """
00194 
00195     ## test case pass/fail is a measure of whether or not the test ran
00196     def fn(self):
00197         printlog("Launching tests")
00198         done = False
00199         while not done:
00200             parent = test_launch_configuration.parent
00201             self.assert_(parent is not None, "ROSTestParent initialization failed")
00202             test_name = test.test_name
00203             printlog("  name..............................%s" % test_name)
00204 
00205             #launch the other nodes
00206             #for parent in self.parents:
00207             # DJS - will this mean I miss starting up othe test parents?
00208             unused_succeeded, failed = parent.launch()
00209             self.assert_(not failed, "Test Fixture Nodes %s failed to launch" % failed)
00210 
00211             #setup the test
00212             # - we pass in the output test_file name so we can scrape it
00213             test_file = rosunit.xml_results_file(test_pkg, test_name, False)
00214             printlog("  test results file.................%s", test_file)
00215             if os.path.exists(test_file):
00216                 os.remove(test_file)
00217                 printlog("  clear old test results file.......done")
00218 
00219             # TODO: have to redeclare this due to a bug -- this file
00220             # needs to be renamed as it aliases the module where the
00221             # constant is elsewhere defined. The fix is to rename
00222             # rostest.py
00223             XML_OUTPUT_FLAG = '--gtest_output=xml:'  # use gtest-compatible flag
00224             test.args = "%s %s%s" % (test.args, XML_OUTPUT_FLAG, test_file)
00225             if _text_mode:
00226                 test.output = 'screen'
00227                 test.args = test.args + " --text"
00228 
00229             # run the test, blocks until completion
00230             printlog("Running test %s" % test_name)
00231             timeout_failure = False
00232             try:
00233                 parent.run_test(test)
00234             except roslaunch.launch.RLTestTimeoutException as unused_e:
00235                 if test.retry:
00236                     timeout_failure = True
00237                 else:
00238                     raise
00239 
00240             if not timeout_failure:
00241                 printlog("test finished [%s]" % test_name)
00242             else:
00243                 printlogerr("test timed out [%s]" % test_name)
00244 
00245             if not _text_mode or timeout_failure:
00246                 if not timeout_failure:
00247                     self.assert_(os.path.isfile(test_file), "test [%s] did not generate test results" % test_name)
00248                     printlog("test [%s] results are in [%s]", test_name, test_file)
00249                     results = rosunit.junitxml.read(test_file, test_name)
00250                     test_fail = results.num_errors or results.num_failures
00251                 else:
00252                     test_fail = True
00253                 if test.retry > 0 and test_fail:
00254                     test.retry -= 1
00255                     printlog("test [%s] failed, retrying. Retries left: %s" % (test_name, test.retry))
00256                     self.tearDown()
00257                     self.setUp()
00258                 else:
00259                     done = True
00260                     _accumulate_results(results)
00261                     printlog("test [%s] results summary: %s errors, %s failures, %s tests",
00262                              test_name, results.num_errors, results.num_failures, results.num_tests)
00263                     #self.assertEquals(0, results.num_errors, "unit test reported errors")
00264                     #self.assertEquals(0, results.num_failures, "unit test reported failures")
00265             else:
00266                 if test.retry:
00267                     printlogerr("retry is disabled in --text mode")
00268                 done = True
00269         printlog("test done [%s]", test_name)
00270     return fn
00271 
00272 
00273 def create_unit_rocon_test(rocon_launcher, launchers):
00274     '''
00275       Constructs the python unit test class.
00276 
00277       @param rocon_launcher : absolute path to the rocon test launcher
00278       @param launchers : list of individual launcher configurations (name, path, port etc) in rocon_launcher
00279       @return unittest.TestCase : unit test class
00280     '''
00281     rocon_launch_configurations = []
00282     for launcher in launchers:
00283         rocon_launch_configurations.append(RoconTestLaunchConfiguration(launcher))
00284     # pass in config to class as a property so that parent can be initialized
00285     classdict = {'setUp': setUp, 'tearDown': tearDown,
00286                  'rocon_launch_configurations': rocon_launch_configurations,
00287                  'test_file': rocon_launcher}
00288 
00289     # add in the tests
00290     test_names = []
00291     for rocon_launch_configuration in rocon_launch_configurations:
00292         config = rocon_launch_configuration.configuration
00293         for test in config.tests:
00294             # #1989: find test first to make sure it exists and is executable
00295             err_msg = None
00296             try:
00297                 rp = rospkg.RosPack()
00298                 cmd = roslib.packages.find_node(test.package, test.type, rp)
00299                 if not cmd:
00300                     err_msg = "Test node [%s/%s] does not exist or is not executable" % (test.package, test.type)
00301             except rospkg.ResourceNotFound:
00302                 err_msg = "Package [%s] for test node [%s/%s] does not exist" % (test.package, test.package, test.type)
00303 
00304             test_name = 'test%s' % (test.test_name)
00305             if err_msg:
00306                 classdict[test_name] = fail_runner(test.test_name, err_msg)
00307             elif test_name in test_names:
00308                 classdict[test_name] = fail_duplicate_runner(test.test_name)
00309             else:
00310                 classdict[test_name] = rocon_test_runner(test, rocon_launch_configuration, launcher.package)
00311                 test_names.append(test_name)
00312 
00313     # instantiate the TestCase instance with our magically-created tests
00314     return type('RoconTest', (unittest.TestCase,), classdict)


rocon_test
Author(s): Daniel Stonier
autogenerated on Sat Jun 8 2019 18:48:47