state_tester.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import inspect
3 import rospkg
4 import rospy
5 import os
6 import re
7 import rosbag
8 import smach
9 import roslaunch
10 import unittest
11 import rosunit
12 import traceback
13 
14 from flexbe_core.core.loopback_state import LoopbackState
15 
16 
17 class StateTester(object):
18 
19  def __init__(self):
20  self._counter = 0
21  self._rp = rospkg.RosPack()
22  self._evaluation_tests = dict()
23 
24  self._run_id = rospy.get_param('/run_id')
25 
26  self._print_debug_positive = rospy.get_param('~print_debug_positive', True)
27  self._print_debug_negative = rospy.get_param('~print_debug_negative', True)
28  self._mute_info = rospy.get_param('~mute_info', False)
29  self._mute_warn = rospy.get_param('~mute_warn', False)
30  self._mute_error = rospy.get_param('~mute_error', False)
31  self._compact_format = rospy.get_param('~compact_format', False)
32 
33  if self._compact_format:
34  self._print_debug_positive = False
35  self._print_debug_negative = True
36  self._mute_info = True
37  self._mute_warn = True
38  self._mute_error = False
39 
40 
41  def run_test(self, name, config):
42  if self._mute_info:
43  rospy.loginfo = rospy.logdebug
44  if self._mute_warn:
45  rospy.logwarn = rospy.logdebug
46  if self._mute_error:
47  rospy.logerror = rospy.logdebug
48 
49  if not self._compact_format: print ''
50  self._counter += 1
51 
52  import_only = config.get('import_only', False)
53  print '\033[34;1m#%2d %s \033[0m\033[34m(%s%s)\033[0m' % (self._counter, name, config['class'], ' > %s' % config['outcome'] if not import_only else '')
54  prefix = '>>>' if not self._compact_format else ' >'
55 
56  # load and start launch file
57  if not import_only and config.has_key('launch'):
58  launchpath = None
59  launchcontent = None
60  if config['launch'].startswith('~') or config['launch'].startswith('/'):
61  launchpath = os.path.expanduser(config['launch'])
62  elif re.match(r'.+\.launch$', config['launch']):
63  launchpath = os.path.join(self._rp.get_path(config['launch'].split('/')[0]), '/'.join(config['launch'].split('/')[1:]))
64  else:
65  launchcontent = config['launch']
66  launchconfig = roslaunch.config.ROSLaunchConfig()
67  loader = roslaunch.xmlloader.XmlLoader()
68  if launchpath is not None:
69  loader.load(launchpath, launchconfig, verbose = False)
70  else:
71  loader.load_string(launchcontent, launchconfig, verbose = False)
72  launchrunner = roslaunch.launch.ROSLaunchRunner(self._run_id, launchconfig)
73 
74  #print '\033[35m'
75  run = launchrunner.launch()
76  launchrunner.spin_once()
77  if self._print_debug_positive: print '\033[0m\033[1m +\033[0m launchfile running'
78 
79  if config.has_key('wait_cond'):
80  try:
81  check_running_rate = rospy.Rate(10)
82  is_running = False
83  while not is_running:
84  is_running = eval(config['wait_cond'])
85  check_running_rate.sleep()
86  if self._print_debug_positive: print '\033[0m\033[1m +\033[0m waiting condition satisfied'
87  except Exception as e:
88  print '\033[31;1m%s\033[0m\033[31m unable to check waiting condition:\n\t%s\033[0m' % (prefix, str(e))
89  self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
90  return 0
91 
92 
93  # prepare rosbag if available
94  bag = None
95  if config.has_key('data'):
96  bagpath = ''
97  if config['data'].startswith('~') or config['data'].startswith('/'):
98  bagpath = os.path.expanduser(config['data'])
99  else:
100  try:
101  bagpath = os.path.join(self._rp.get_path(config['data'].split('/')[0]), '/'.join(config['data'].split('/')[1:]))
102  except Exception as e:
103  print '\033[31;1m%s\033[0m\033[31m unable to get input bagfile %s:\n\t%s\033[0m' % (prefix, config['data'], str(e))
104  self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
105  return 0
106  bag = rosbag.Bag(bagpath)
107  if self._print_debug_positive: print '\033[1m +\033[0m using data source: %s' % bagpath
108 
109  # import state
110  try:
111  package = __import__(config['path'], fromlist=[config['path']])
112  clsmembers = inspect.getmembers(package, lambda member: inspect.isclass(member) and member.__module__ == package.__name__)
113  StateClass = next(c for n,c in clsmembers if n == config['class'])
114  except Exception as e:
115  print '\033[31;1m%s\033[0m\033[31m unable to import state %s (%s):\n\t%s\033[0m' % (prefix, config['class'], config['path'], str(e))
116  traceback.print_exc()
117  self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
118  return 0
119  if self._print_debug_positive: print '\033[1m +\033[0m state imported'
120 
121  if not import_only:
122  # prepare parameters
123  params = None
124  if config.has_key('params'):
125  for key, value in config['params'].iteritems():
126  try:
127  config['params'][key] = self._parse_data_value(value, bag)
128  except Exception as e:
129  if not self._compact_format:
130  print '\033[33;1m >\033[0m\033[33m unable to get message from topic %s: ignoring replacement...\033[0m' % (str(value))
131  params = config['params']
132 
133  # instatiate state
134  state = None
135  try:
136  if params is None:
137  state = StateClass()
138  else:
139  state = StateClass(**params)
140  except Exception as e:
141  print '\033[31;1m%s\033[0m\033[31m unable to instantiate state %s (%s) with params:\n\t%s\n\t%s\033[0m' % (prefix, config['class'], config['path'], str(params), str(e))
142  traceback.print_exc()
143  self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
144  return 0
145  if self._print_debug_positive: print '\033[1m +\033[0m state instantiated'
146 
147  # set input values
148  userdata = smach.UserData()
149  if config.has_key('input'):
150  for input_key, input_value in config['input'].iteritems():
151  userdata[input_key] = self._parse_data_value(input_value, bag)
152 
153  # set output values
154  expected = dict()
155  if config.has_key('output'):
156  for output_key, output_value in config['output'].iteritems():
157  expected[output_key] = self._parse_data_value(output_value, bag)
158 
159  # execute state
160  try:
161  state.on_start()
162  outcome = LoopbackState._loopback_name
163  while outcome == LoopbackState._loopback_name and not rospy.is_shutdown():
164  outcome = state.execute(userdata)
165  state._rate.sleep()
166  if config.has_key('launch'):
167  launchrunner.spin_once()
168  state.on_stop()
169  except Exception as e:
170  print '\033[31;1m%s\033[0m\033[31m failed to execute state %s (%s)\n\t%s\033[0m' % (prefix, config['class'], config['path'], str(e))
171  traceback.print_exc()
172  self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
173  return 0
174 
175  if config.has_key('launch'):
176  #print '\033[35m'
177  launchrunner.stop()
178  if self._print_debug_positive: print '\033[0m\033[1m +\033[0m launchfile stopped'
179 
180  # evaluate output
181  output_ok = True
182  for expected_key, expected_value in expected.iteritems():
183  if expected_key in userdata.keys():
184  equals = userdata[expected_key] == expected_value
185  self._evaluation_tests['test_%s_output_%s' % (name.split('.')[0], expected_key)] = \
186  self._test_output(userdata[expected_key], expected_value)
187  if not equals:
188  if self._print_debug_negative: print '\033[1m -\033[0m wrong result for %s: %s != %s' % (expected_key, userdata[expected_key], expected_value)
189  output_ok = False
190  else:
191  if self._print_debug_negative: print '\033[1m -\033[0m no result for %s' % expected_key
192  output_ok = False
193  if len(expected) > 0 and output_ok:
194  if self._print_debug_positive: print '\033[1m +\033[0m all result outputs match expected'
195 
196  # evaluate outcome
197  outcome_ok = outcome == config['outcome']
198  self._evaluation_tests['test_%s_outcome' % name.split('.')[0]] = self._test_outcome(outcome, config['outcome'])
199  if outcome_ok:
200  if self._print_debug_positive: print '\033[1m +\033[0m correctly returned outcome %s' % outcome
201  else:
202  if self._print_debug_negative: print '\033[1m -\033[0m wrong outcome: %s' % outcome
203 
204  # report result
205  if import_only or outcome_ok and output_ok:
206  print '\033[32;1m%s\033[0m\033[32m %s completed!\033[0m' % (prefix, name)
207  self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(True)
208  return 1
209  else:
210  print '\033[31;1m%s\033[0m\033[31m %s failed!\033[0m' % (prefix, name)
211  self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
212  return 0
213 
214  def perform_rostest(self, test_pkg):
215  TestCase = type(test_pkg + '_test_class', (unittest.TestCase,), self._evaluation_tests)
216  rosunit.unitrun(test_pkg, test_pkg + '_flexbe_tests', TestCase)
217 
218 
219  def _parse_data_value(self, data_value, bag):
220  # message data
221  if isinstance(data_value, basestring) and len(data_value) > 1 and data_value[0] == '/' and data_value[1] != '/' and bag is not None:
222  (_, data_value, _) = list(bag.read_messages(topics=[data_value]))[0]
223 
224  # anonymous function
225  elif isinstance(data_value, basestring) and data_value.startswith('lambda '):
226  data_value = eval(data_value)
227 
228  # None
229  elif data_value == 'None':
230  data_value = None
231 
232  # escaped backslash at the beginning
233  elif isinstance(data_value, basestring) and len(data_value) > 1 and data_value[0] == '/' and data_value[1] == '/':
234  data_value = data_value[1:]
235 
236  return data_value
237 
238 
239  # ROSUNIT callbacks
240 
241  def _test_output(self, value, expected):
242  def _test_call(test_self):
243  test_self.assertEquals(value, expected, "Output value %s does not match expected %s" % (value, expected))
244  return _test_call
245 
246  def _test_outcome(self, outcome, expected):
247  def _test_call(test_self):
248  test_self.assertEquals(outcome, expected, "Outcome %s does not match expected %s" % (outcome, expected))
249  return _test_call
250 
251  def _test_pass(self, passed):
252  def _test_call(test_self):
253  test_self.assertTrue(passed, "State did not pass flexbe tests.")
254  return _test_call
def _test_outcome(self, outcome, expected)
def _test_output(self, value, expected)
def run_test(self, name, config)
Definition: state_tester.py:41
def _parse_data_value(self, data_value, bag)


flexbe_testing
Author(s): Philipp Schillinger
autogenerated on Wed Jun 5 2019 21:52:05