8 from .logger
import Logger
9 from .test_interface
import TestInterface
10 from .test_context
import TestContext, LaunchContext
11 from .data_provider
import DataProvider
22 except Exception
as e:
23 Logger.print_title(name,
'Invalid',
None)
24 Logger.print_error(
'invalid test specification!\n\t%s' % str(e))
25 Logger.print_result(name,
False)
31 config[
'path'] +=
'.%s_sm' % re.sub(
r'[^\w]',
'_', config[
'name'].lower())
32 config[
'class'] =
'%sSM' % re.sub(
r'[^\w]',
'', config[
'name'])
34 import_only = config.get(
'import_only',
False)
35 Logger.print_title(name, config[
'class'], config[
'outcome']
if not import_only
else None)
39 test_interface =
TestInterface(config[
'path'], config[
'class'])
40 except Exception
as e:
41 Logger.print_failure(
'unable to import state %s (%s):\n\t%s' %
42 (config[
'class'], config[
'path'], str(e)))
49 if 'launch' in config:
50 context =
LaunchContext(config[
'launch'], config.get(
'wait_cond',
'True'))
57 except Exception
as e:
58 Logger.print_failure(
'unable to load data source %s:\n\t%s' %
59 (config[
'data'], str(e)))
65 if not context.verify():
66 Logger.print_error(
'failed to initialize test context:\n\t%s' % config[
'launch'])
71 params = {key: data.parse(value)
for key, value
in list(config.get(
'params', dict()).items())}
73 test_interface.instantiate(params)
74 except Exception
as e:
75 Logger.print_failure(
'unable to instantiate %s (%s) with params:\n\t%s\n\t%s' %
76 (config[
'class'], config[
'path'], str(params), str(e)))
82 for input_key, input_value
in list(config.get(
'input', dict()).items()):
83 userdata[input_key] = data.parse(input_value)
84 expected = {key: data.parse(value)
for key, value
in config.get(
'output', dict()).items()}
88 outcome = test_interface.execute(userdata, spin_cb=context.spin_once)
89 except Exception
as e:
90 Logger.print_failure(
'failed to execute %s (%s)\n\t%s' %
91 (config[
'class'], config[
'path'], str(e)))
95 if config.get(
'require_launch_success',
False):
96 context.wait_for_finishing()
100 outcome_ok = outcome == config[
'outcome']
102 Logger.print_positive(
'correctly returned outcome %s' % outcome)
104 Logger.print_negative(
'wrong outcome: %s' % outcome)
108 for expected_key, expected_value
in list(expected.items()):
109 if expected_key
in userdata:
110 equals = userdata[expected_key] == expected_value
111 self.
_tests[
'test_%s_output_%s' % (name, expected_key)] = \
112 self.
_test_output(userdata[expected_key], expected_value)
114 Logger.print_negative(
'wrong result for %s: %s != %s' %
115 (expected_key, userdata[expected_key], expected_value))
118 Logger.print_negative(
'no result for %s' % expected_key)
121 if not context.success
and config.get(
'require_launch_success',
False):
122 Logger.print_negative(
'Launch file did not exit cleanly')
125 if len(expected) > 0
and output_ok:
126 Logger.print_positive(
'all result outputs match expected')
129 success = import_only
or outcome_ok
and output_ok
130 Logger.print_result(name, success)
132 return 1
if success
else 0
135 if not isinstance(config, dict):
136 raise AssertionError(
'config needs to be a dictionary but is:\n\t%s' % str(config))
137 assert 'path' in config
138 assert 'class' in config
or 'name' in config
139 assert 'outcome' in config
or config.get(
'import_only',
False)
144 TestCase = type(test_pkg +
'_test_class', (unittest.TestCase,), self.
_tests)
145 rosunit.unitrun(test_pkg, test_pkg +
'_flexbe_tests', TestCase)
148 def _test_call(test_self):
149 test_self.assertEqual(value, expected,
"Output value %s does not match expected %s" % (value, expected))
153 def _test_call(test_self):
154 test_self.assertEqual(outcome, expected,
"Outcome %s does not match expected %s" % (outcome, expected))
158 def _test_call(test_self):
159 test_self.assertTrue(passed,
"Did not pass configured tests.")
163 def _test_call(test_self):
164 test_self.fail(
"Test config is invalid: %s" % config)
def _test_outcome(self, outcome, expected)
def _test_pass(self, passed)
def _test_config_invalid(self, config)
def run_test(self, name, config)
def _test_output(self, value, expected)
def perform_rostest(self, test_pkg)
def _verify_config(self, config)