00001
00002 import inspect
00003 import rospkg
00004 import rospy
00005 import os
00006 import re
00007 import rosbag
00008 import smach
00009 import roslaunch
00010 import unittest
00011 import rosunit
00012 import traceback
00013
00014 from flexbe_core.core.loopback_state import LoopbackState
00015
00016
00017 class StateTester(object):
00018
00019 def __init__(self):
00020 self._counter = 0
00021 self._rp = rospkg.RosPack()
00022 self._evaluation_tests = dict()
00023
00024 self._run_id = rospy.get_param('/run_id')
00025
00026 self._print_debug_positive = rospy.get_param('~print_debug_positive', True)
00027 self._print_debug_negative = rospy.get_param('~print_debug_negative', True)
00028 self._mute_info = rospy.get_param('~mute_info', False)
00029 self._mute_warn = rospy.get_param('~mute_warn', False)
00030 self._mute_error = rospy.get_param('~mute_error', False)
00031 self._compact_format = rospy.get_param('~compact_format', False)
00032
00033 if self._compact_format:
00034 self._print_debug_positive = False
00035 self._print_debug_negative = True
00036 self._mute_info = True
00037 self._mute_warn = True
00038 self._mute_error = False
00039
00040
00041 def run_test(self, name, config):
00042 if self._mute_info:
00043 rospy.loginfo = rospy.logdebug
00044 if self._mute_warn:
00045 rospy.logwarn = rospy.logdebug
00046 if self._mute_error:
00047 rospy.logerror = rospy.logdebug
00048
00049 if not self._compact_format: print ''
00050 self._counter += 1
00051
00052 import_only = config.get('import_only', False)
00053 print '\033[34;1m#%2d %s \033[0m\033[34m(%s%s)\033[0m' % (self._counter, name, config['class'], ' > %s' % config['outcome'] if not import_only else '')
00054 prefix = '>>>' if not self._compact_format else ' >'
00055
00056
00057 if not import_only and config.has_key('launch'):
00058 launchpath = None
00059 launchcontent = None
00060 if config['launch'].startswith('~') or config['launch'].startswith('/'):
00061 launchpath = os.path.expanduser(config['launch'])
00062 elif re.match(r'.+\.launch$', config['launch']):
00063 launchpath = os.path.join(self._rp.get_path(config['launch'].split('/')[0]), '/'.join(config['launch'].split('/')[1:]))
00064 else:
00065 launchcontent = config['launch']
00066 launchconfig = roslaunch.config.ROSLaunchConfig()
00067 loader = roslaunch.xmlloader.XmlLoader()
00068 if launchpath is not None:
00069 loader.load(launchpath, launchconfig, verbose = False)
00070 else:
00071 loader.load_string(launchcontent, launchconfig, verbose = False)
00072 launchrunner = roslaunch.launch.ROSLaunchRunner(self._run_id, launchconfig)
00073
00074
00075 run = launchrunner.launch()
00076 launchrunner.spin_once()
00077 if self._print_debug_positive: print '\033[0m\033[1m +\033[0m launchfile running'
00078
00079 if config.has_key('wait_cond'):
00080 try:
00081 check_running_rate = rospy.Rate(10)
00082 is_running = False
00083 while not is_running:
00084 is_running = eval(config['wait_cond'])
00085 check_running_rate.sleep()
00086 if self._print_debug_positive: print '\033[0m\033[1m +\033[0m waiting condition satisfied'
00087 except Exception as e:
00088 print '\033[31;1m%s\033[0m\033[31m unable to check waiting condition:\n\t%s\033[0m' % (prefix, str(e))
00089 self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
00090 return 0
00091
00092
00093
00094 bag = None
00095 if config.has_key('data'):
00096 bagpath = ''
00097 if config['data'].startswith('~') or config['data'].startswith('/'):
00098 bagpath = os.path.expanduser(config['data'])
00099 else:
00100 try:
00101 bagpath = os.path.join(self._rp.get_path(config['data'].split('/')[0]), '/'.join(config['data'].split('/')[1:]))
00102 except Exception as e:
00103 print '\033[31;1m%s\033[0m\033[31m unable to get input bagfile %s:\n\t%s\033[0m' % (prefix, config['data'], str(e))
00104 self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
00105 return 0
00106 bag = rosbag.Bag(bagpath)
00107 if self._print_debug_positive: print '\033[1m +\033[0m using data source: %s' % bagpath
00108
00109
00110 try:
00111 package = __import__(config['path'], fromlist=[config['path']])
00112 clsmembers = inspect.getmembers(package, lambda member: inspect.isclass(member) and member.__module__ == package.__name__)
00113 StateClass = next(c for n,c in clsmembers if n == config['class'])
00114 except Exception as e:
00115 print '\033[31;1m%s\033[0m\033[31m unable to import state %s (%s):\n\t%s\033[0m' % (prefix, config['class'], config['path'], str(e))
00116 traceback.print_exc()
00117 self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
00118 return 0
00119 if self._print_debug_positive: print '\033[1m +\033[0m state imported'
00120
00121 if not import_only:
00122
00123 params = None
00124 if config.has_key('params'):
00125 for key, value in config['params'].iteritems():
00126 try:
00127 config['params'][key] = self._parse_data_value(value, bag)
00128 except Exception as e:
00129 if not self._compact_format:
00130 print '\033[33;1m >\033[0m\033[33m unable to get message from topic %s: ignoring replacement...\033[0m' % (str(value))
00131 params = config['params']
00132
00133
00134 state = None
00135 try:
00136 if params is None:
00137 state = StateClass()
00138 else:
00139 state = StateClass(**params)
00140 except Exception as e:
00141 print '\033[31;1m%s\033[0m\033[31m unable to instantiate state %s (%s) with params:\n\t%s\n\t%s\033[0m' % (prefix, config['class'], config['path'], str(params), str(e))
00142 traceback.print_exc()
00143 self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
00144 return 0
00145 if self._print_debug_positive: print '\033[1m +\033[0m state instantiated'
00146
00147
00148 userdata = smach.UserData()
00149 if config.has_key('input'):
00150 for input_key, input_value in config['input'].iteritems():
00151 userdata[input_key] = self._parse_data_value(input_value, bag)
00152
00153
00154 expected = dict()
00155 if config.has_key('output'):
00156 for output_key, output_value in config['output'].iteritems():
00157 expected[output_key] = self._parse_data_value(output_value, bag)
00158
00159
00160 try:
00161 state.on_start()
00162 outcome = LoopbackState._loopback_name
00163 while outcome == LoopbackState._loopback_name and not rospy.is_shutdown():
00164 outcome = state.execute(userdata)
00165 state._rate.sleep()
00166 if config.has_key('launch'):
00167 launchrunner.spin_once()
00168 state.on_stop()
00169 except Exception as e:
00170 print '\033[31;1m%s\033[0m\033[31m failed to execute state %s (%s)\n\t%s\033[0m' % (prefix, config['class'], config['path'], str(e))
00171 traceback.print_exc()
00172 self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
00173 return 0
00174
00175 if config.has_key('launch'):
00176
00177 launchrunner.stop()
00178 if self._print_debug_positive: print '\033[0m\033[1m +\033[0m launchfile stopped'
00179
00180
00181 output_ok = True
00182 for expected_key, expected_value in expected.iteritems():
00183 if expected_key in userdata.keys():
00184 equals = userdata[expected_key] == expected_value
00185 self._evaluation_tests['test_%s_output_%s' % (name.split('.')[0], expected_key)] = \
00186 self._test_output(userdata[expected_key], expected_value)
00187 if not equals:
00188 if self._print_debug_negative: print '\033[1m -\033[0m wrong result for %s: %s != %s' % (expected_key, userdata[expected_key], expected_value)
00189 output_ok = False
00190 else:
00191 if self._print_debug_negative: print '\033[1m -\033[0m no result for %s' % expected_key
00192 output_ok = False
00193 if len(expected) > 0 and output_ok:
00194 if self._print_debug_positive: print '\033[1m +\033[0m all result outputs match expected'
00195
00196
00197 outcome_ok = outcome == config['outcome']
00198 self._evaluation_tests['test_%s_outcome' % name.split('.')[0]] = self._test_outcome(outcome, config['outcome'])
00199 if outcome_ok:
00200 if self._print_debug_positive: print '\033[1m +\033[0m correctly returned outcome %s' % outcome
00201 else:
00202 if self._print_debug_negative: print '\033[1m -\033[0m wrong outcome: %s' % outcome
00203
00204
00205 if import_only or outcome_ok and output_ok:
00206 print '\033[32;1m%s\033[0m\033[32m %s completed!\033[0m' % (prefix, name)
00207 self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(True)
00208 return 1
00209 else:
00210 print '\033[31;1m%s\033[0m\033[31m %s failed!\033[0m' % (prefix, name)
00211 self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
00212 return 0
00213
00214 def perform_rostest(self, test_pkg):
00215 TestCase = type(test_pkg + '_test_class', (unittest.TestCase,), self._evaluation_tests)
00216 rosunit.unitrun(test_pkg, test_pkg + '_flexbe_tests', TestCase)
00217
00218
00219 def _parse_data_value(self, data_value, bag):
00220
00221 if isinstance(data_value, basestring) and len(data_value) > 1 and data_value[0] == '/' and data_value[1] != '/' and bag is not None:
00222 (_, data_value, _) = list(bag.read_messages(topics=[data_value]))[0]
00223
00224
00225 elif isinstance(data_value, basestring) and data_value.startswith('lambda '):
00226 data_value = eval(data_value)
00227
00228
00229 elif data_value == 'None':
00230 data_value = None
00231
00232
00233 elif isinstance(data_value, basestring) and len(data_value) > 1 and data_value[0] == '/' and data_value[1] == '/':
00234 data_value = data_value[1:]
00235
00236 return data_value
00237
00238
00239
00240
00241 def _test_output(self, value, expected):
00242 def _test_call(test_self):
00243 test_self.assertEquals(value, expected, "Output value %s does not match expected %s" % (value, expected))
00244 return _test_call
00245
00246 def _test_outcome(self, outcome, expected):
00247 def _test_call(test_self):
00248 test_self.assertEquals(outcome, expected, "Outcome %s does not match expected %s" % (outcome, expected))
00249 return _test_call
00250
00251 def _test_pass(self, passed):
00252 def _test_call(test_self):
00253 test_self.assertTrue(passed, "State did not pass flexbe tests.")
00254 return _test_call