21 self.
_rp = rospkg.RosPack()
43 rospy.loginfo = rospy.logdebug
45 rospy.logwarn = rospy.logdebug
47 rospy.logerror = rospy.logdebug
52 import_only = config.get(
'import_only',
False)
53 print '\033[34;1m#%2d %s \033[0m\033[34m(%s%s)\033[0m' % (self.
_counter, name, config[
'class'],
' > %s' % config[
'outcome']
if not import_only
else '')
57 if not import_only
and config.has_key(
'launch'):
60 if config[
'launch'].startswith(
'~')
or config[
'launch'].startswith(
'/'):
61 launchpath = os.path.expanduser(config[
'launch'])
62 elif re.match(
r'.+\.launch$', config[
'launch']):
63 launchpath = os.path.join(self._rp.get_path(config[
'launch'].split(
'/')[0]),
'/'.join(config[
'launch'].split(
'/')[1:]))
65 launchcontent = config[
'launch']
66 launchconfig = roslaunch.config.ROSLaunchConfig()
67 loader = roslaunch.xmlloader.XmlLoader()
68 if launchpath
is not None:
69 loader.load(launchpath, launchconfig, verbose =
False)
71 loader.load_string(launchcontent, launchconfig, verbose =
False)
72 launchrunner = roslaunch.launch.ROSLaunchRunner(self.
_run_id, launchconfig)
75 run = launchrunner.launch()
76 launchrunner.spin_once()
79 if config.has_key(
'wait_cond'):
81 check_running_rate = rospy.Rate(10)
84 is_running = eval(config[
'wait_cond'])
85 check_running_rate.sleep()
87 except Exception
as e:
88 print '\033[31;1m%s\033[0m\033[31m unable to check waiting condition:\n\t%s\033[0m' % (prefix, str(e))
95 if config.has_key(
'data'):
97 if config[
'data'].startswith(
'~')
or config[
'data'].startswith(
'/'):
98 bagpath = os.path.expanduser(config[
'data'])
101 bagpath = os.path.join(self._rp.get_path(config[
'data'].split(
'/')[0]),
'/'.join(config[
'data'].split(
'/')[1:]))
102 except Exception
as e:
103 print '\033[31;1m%s\033[0m\033[31m unable to get input bagfile %s:\n\t%s\033[0m' % (prefix, config[
'data'], str(e))
111 package = __import__(config[
'path'], fromlist=[config[
'path']])
112 clsmembers = inspect.getmembers(package,
lambda member: inspect.isclass(member)
and member.__module__ == package.__name__)
113 StateClass = next(c
for n,c
in clsmembers
if n == config[
'class'])
114 except Exception
as e:
115 print '\033[31;1m%s\033[0m\033[31m unable to import state %s (%s):\n\t%s\033[0m' % (prefix, config[
'class'], config[
'path'], str(e))
116 traceback.print_exc()
124 if config.has_key(
'params'):
125 for key, value
in config[
'params'].iteritems():
128 except Exception
as e:
130 print '\033[33;1m >\033[0m\033[33m unable to get message from topic %s: ignoring replacement...\033[0m' % (str(value))
131 params = config[
'params']
139 state = StateClass(**params)
140 except Exception
as e:
141 print '\033[31;1m%s\033[0m\033[31m unable to instantiate state %s (%s) with params:\n\t%s\n\t%s\033[0m' % (prefix, config[
'class'], config[
'path'], str(params), str(e))
142 traceback.print_exc()
148 userdata = smach.UserData()
149 if config.has_key(
'input'):
150 for input_key, input_value
in config[
'input'].iteritems():
155 if config.has_key(
'output'):
156 for output_key, output_value
in config[
'output'].iteritems():
162 outcome = LoopbackState._loopback_name
163 while outcome == LoopbackState._loopback_name
and not rospy.is_shutdown():
164 outcome = state.execute(userdata)
166 if config.has_key(
'launch'):
167 launchrunner.spin_once()
169 except Exception
as e:
170 print '\033[31;1m%s\033[0m\033[31m failed to execute state %s (%s)\n\t%s\033[0m' % (prefix, config[
'class'], config[
'path'], str(e))
171 traceback.print_exc()
175 if config.has_key(
'launch'):
182 for expected_key, expected_value
in expected.iteritems():
183 if expected_key
in userdata.keys():
184 equals = userdata[expected_key] == expected_value
185 self.
_evaluation_tests[
'test_%s_output_%s' % (name.split(
'.')[0], expected_key)] = \
186 self.
_test_output(userdata[expected_key], expected_value)
188 if self.
_print_debug_negative:
print '\033[1m -\033[0m wrong result for %s: %s != %s' % (expected_key, userdata[expected_key], expected_value)
193 if len(expected) > 0
and output_ok:
197 outcome_ok = outcome == config[
'outcome']
205 if import_only
or outcome_ok
and output_ok:
206 print '\033[32;1m%s\033[0m\033[32m %s completed!\033[0m' % (prefix, name)
210 print '\033[31;1m%s\033[0m\033[31m %s failed!\033[0m' % (prefix, name)
215 TestCase = type(test_pkg +
'_test_class', (unittest.TestCase,), self.
_evaluation_tests)
216 rosunit.unitrun(test_pkg, test_pkg +
'_flexbe_tests', TestCase)
221 if isinstance(data_value, basestring)
and len(data_value) > 1
and data_value[0] ==
'/' and data_value[1] !=
'/' and bag
is not None:
222 (_, data_value, _) = list(bag.read_messages(topics=[data_value]))[0]
225 elif isinstance(data_value, basestring)
and data_value.startswith(
'lambda '):
226 data_value = eval(data_value)
229 elif data_value ==
'None':
233 elif isinstance(data_value, basestring)
and len(data_value) > 1
and data_value[0] ==
'/' and data_value[1] ==
'/':
234 data_value = data_value[1:]
242 def _test_call(test_self):
243 test_self.assertEquals(value, expected,
"Output value %s does not match expected %s" % (value, expected))
247 def _test_call(test_self):
248 test_self.assertEquals(outcome, expected,
"Outcome %s does not match expected %s" % (outcome, expected))
252 def _test_call(test_self):
253 test_self.assertTrue(passed,
"State did not pass flexbe tests.")
def _test_outcome(self, outcome, expected)
def _test_output(self, value, expected)
def run_test(self, name, config)
def _test_pass(self, passed)
def perform_rostest(self, test_pkg)
def _parse_data_value(self, data_value, bag)