state_tester.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import inspect
00003 import rospkg
00004 import rospy
00005 import os
00006 import re
00007 import rosbag
00008 import smach
00009 import roslaunch
00010 import unittest
00011 import rosunit
00012 import traceback
00013 
00014 from flexbe_core.core.loopback_state import LoopbackState
00015 
00016 
00017 class StateTester(object):
00018 
00019         def __init__(self):
00020                 self._counter = 0
00021                 self._rp = rospkg.RosPack()
00022                 self._evaluation_tests = dict()
00023 
00024                 self._run_id = rospy.get_param('/run_id')
00025 
00026                 self._print_debug_positive = rospy.get_param('~print_debug_positive', True)
00027                 self._print_debug_negative = rospy.get_param('~print_debug_negative', True)
00028                 self._mute_info = rospy.get_param('~mute_info', False)
00029                 self._mute_warn = rospy.get_param('~mute_warn', False)
00030                 self._mute_error = rospy.get_param('~mute_error', False)
00031                 self._compact_format = rospy.get_param('~compact_format', False)
00032 
00033                 if self._compact_format:
00034                         self._print_debug_positive = False
00035                         self._print_debug_negative = True
00036                         self._mute_info = True
00037                         self._mute_warn = True
00038                         self._mute_error = False
00039 
00040 
00041         def run_test(self, name, config):
00042                 if self._mute_info:
00043                         rospy.loginfo = rospy.logdebug
00044                 if self._mute_warn:
00045                         rospy.logwarn = rospy.logdebug
00046                 if self._mute_error:
00047                         rospy.logerror = rospy.logdebug
00048 
00049                 if not self._compact_format: print ''
00050                 self._counter += 1
00051 
00052                 import_only = config.get('import_only', False)
00053                 print '\033[34;1m#%2d %s \033[0m\033[34m(%s%s)\033[0m' % (self._counter, name, config['class'], ' > %s' % config['outcome'] if not import_only else '')
00054                 prefix = '>>>' if not self._compact_format else '  >'
00055 
00056                 # load and start launch file
00057                 if not import_only and config.has_key('launch'):
00058                         launchpath = None
00059                         launchcontent = None
00060                         if config['launch'].startswith('~') or config['launch'].startswith('/'):
00061                                 launchpath = os.path.expanduser(config['launch'])
00062                         elif re.match(r'.+\.launch$', config['launch']):
00063                                 launchpath = os.path.join(self._rp.get_path(config['launch'].split('/')[0]), '/'.join(config['launch'].split('/')[1:]))
00064                         else:
00065                                 launchcontent = config['launch']
00066                         launchconfig = roslaunch.config.ROSLaunchConfig()
00067                         loader = roslaunch.xmlloader.XmlLoader()
00068                         if launchpath is not None:
00069                                 loader.load(launchpath, launchconfig, verbose = False)
00070                         else:
00071                                 loader.load_string(launchcontent, launchconfig, verbose = False)
00072                         launchrunner = roslaunch.launch.ROSLaunchRunner(self._run_id, launchconfig)
00073 
00074                         #print '\033[35m'
00075                         run = launchrunner.launch()
00076                         launchrunner.spin_once()
00077                         if self._print_debug_positive: print '\033[0m\033[1m  +\033[0m launchfile running'
00078 
00079                         if config.has_key('wait_cond'):
00080                                 try:
00081                                         check_running_rate = rospy.Rate(10)
00082                                         is_running = False
00083                                         while not is_running:
00084                                                 is_running = eval(config['wait_cond'])
00085                                                 check_running_rate.sleep()
00086                                         if self._print_debug_positive: print '\033[0m\033[1m  +\033[0m waiting condition satisfied'
00087                                 except Exception as e:
00088                                         print '\033[31;1m%s\033[0m\033[31m unable to check waiting condition:\n\t%s\033[0m' % (prefix, str(e))
00089                                         self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
00090                                         return 0
00091 
00092 
00093                 # prepare rosbag if available
00094                 bag = None
00095                 if config.has_key('data'):
00096                         bagpath = ''
00097                         if config['data'].startswith('~') or config['data'].startswith('/'):
00098                                 bagpath = os.path.expanduser(config['data'])
00099                         else:
00100                                 try:
00101                                         bagpath = os.path.join(self._rp.get_path(config['data'].split('/')[0]), '/'.join(config['data'].split('/')[1:]))
00102                                 except Exception as e:
00103                                         print '\033[31;1m%s\033[0m\033[31m unable to get input bagfile %s:\n\t%s\033[0m' % (prefix, config['data'], str(e))
00104                                         self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
00105                                         return 0
00106                         bag = rosbag.Bag(bagpath)
00107                         if self._print_debug_positive: print '\033[1m  +\033[0m using data source: %s' % bagpath
00108 
00109                 # import state
00110                 try:
00111                         package = __import__(config['path'], fromlist=[config['path']])
00112                         clsmembers = inspect.getmembers(package, lambda member: inspect.isclass(member) and member.__module__ == package.__name__)
00113                         StateClass = next(c for n,c in clsmembers if n == config['class'])
00114                 except Exception as e:
00115                         print '\033[31;1m%s\033[0m\033[31m unable to import state %s (%s):\n\t%s\033[0m' % (prefix, config['class'], config['path'], str(e))
00116                         traceback.print_exc()
00117                         self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
00118                         return 0
00119                 if self._print_debug_positive: print '\033[1m  +\033[0m state imported'
00120 
00121                 if not import_only:
00122                         # prepare parameters
00123                         params = None
00124                         if config.has_key('params'):
00125                                 for key, value in config['params'].iteritems():
00126                                         try:
00127                                                 config['params'][key] = self._parse_data_value(value, bag)
00128                                         except Exception as e:
00129                                                 if not self._compact_format:
00130                                                         print '\033[33;1m  >\033[0m\033[33m unable to get message from topic %s: ignoring replacement...\033[0m' % (str(value))
00131                                 params = config['params']
00132 
00133                         # instatiate state
00134                         state = None
00135                         try:
00136                                 if params is None:
00137                                         state = StateClass()
00138                                 else:
00139                                         state = StateClass(**params)
00140                         except Exception as e:
00141                                 print '\033[31;1m%s\033[0m\033[31m unable to instantiate state %s (%s) with params:\n\t%s\n\t%s\033[0m' % (prefix, config['class'], config['path'], str(params), str(e))
00142                                 traceback.print_exc()
00143                                 self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
00144                                 return 0
00145                         if self._print_debug_positive: print '\033[1m  +\033[0m state instantiated'
00146 
00147                         # set input values
00148                         userdata = smach.UserData()
00149                         if config.has_key('input'):
00150                                 for input_key, input_value in config['input'].iteritems():
00151                                         userdata[input_key] = self._parse_data_value(input_value, bag)
00152 
00153                         # set output values
00154                         expected = dict()
00155                         if config.has_key('output'):
00156                                 for output_key, output_value in config['output'].iteritems():
00157                                         expected[output_key] = self._parse_data_value(output_value, bag)
00158 
00159                         # execute state
00160                         try:
00161                                 state.on_start()
00162                                 outcome = LoopbackState._loopback_name
00163                                 while outcome == LoopbackState._loopback_name and not rospy.is_shutdown():
00164                                         outcome = state.execute(userdata)
00165                                         state._rate.sleep()
00166                                         if config.has_key('launch'):
00167                                                 launchrunner.spin_once()
00168                                 state.on_stop()
00169                         except Exception as e:
00170                                 print '\033[31;1m%s\033[0m\033[31m failed to execute state %s (%s)\n\t%s\033[0m' % (prefix, config['class'], config['path'], str(e))
00171                                 traceback.print_exc()
00172                                 self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
00173                                 return 0
00174 
00175                         if config.has_key('launch'):
00176                                 #print '\033[35m'
00177                                 launchrunner.stop()
00178                                 if self._print_debug_positive: print '\033[0m\033[1m  +\033[0m launchfile stopped'
00179 
00180                         # evaluate output
00181                         output_ok = True
00182                         for expected_key, expected_value in expected.iteritems():
00183                                 if expected_key in userdata.keys():
00184                                         equals = userdata[expected_key] == expected_value
00185                                         self._evaluation_tests['test_%s_output_%s' % (name.split('.')[0], expected_key)] = \
00186                                                 self._test_output(userdata[expected_key], expected_value)
00187                                         if not equals:
00188                                                 if self._print_debug_negative: print '\033[1m  -\033[0m wrong result for %s: %s != %s' % (expected_key, userdata[expected_key], expected_value)
00189                                                 output_ok = False
00190                                 else:
00191                                         if self._print_debug_negative: print '\033[1m  -\033[0m no result for %s' % expected_key
00192                                         output_ok = False
00193                         if len(expected) > 0 and output_ok:
00194                                 if self._print_debug_positive: print '\033[1m  +\033[0m all result outputs match expected'
00195 
00196                         # evaluate outcome
00197                         outcome_ok = outcome == config['outcome']
00198                         self._evaluation_tests['test_%s_outcome' % name.split('.')[0]] = self._test_outcome(outcome, config['outcome'])
00199                         if outcome_ok:
00200                                 if self._print_debug_positive: print '\033[1m  +\033[0m correctly returned outcome %s' % outcome
00201                         else:
00202                                 if self._print_debug_negative: print '\033[1m  -\033[0m wrong outcome: %s' % outcome
00203                         
00204                 # report result
00205                 if import_only or outcome_ok and output_ok:
00206                         print '\033[32;1m%s\033[0m\033[32m %s completed!\033[0m' % (prefix, name)
00207                         self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(True)
00208                         return 1
00209                 else:
00210                         print '\033[31;1m%s\033[0m\033[31m %s failed!\033[0m' % (prefix, name)
00211                         self._evaluation_tests['test_%s_pass' % name.split('.')[0]] = self._test_pass(False)
00212                         return 0
00213 
00214         def perform_rostest(self, test_pkg):
00215                 TestCase = type(test_pkg + '_test_class', (unittest.TestCase,), self._evaluation_tests)
00216                 rosunit.unitrun(test_pkg, test_pkg + '_flexbe_tests', TestCase)
00217 
00218 
00219         def _parse_data_value(self, data_value, bag):
00220                 # message data
00221                 if isinstance(data_value, basestring) and len(data_value) > 1 and data_value[0] == '/' and data_value[1] != '/' and bag is not None:
00222                         (_, data_value, _) = list(bag.read_messages(topics=[data_value]))[0]
00223 
00224                 # anonymous function
00225                 elif isinstance(data_value, basestring) and data_value.startswith('lambda '):
00226                         data_value = eval(data_value)
00227 
00228                 # None
00229                 elif data_value == 'None':
00230                         data_value = None
00231 
00232                 # escaped backslash at the beginning
00233                 elif isinstance(data_value, basestring) and len(data_value) > 1 and data_value[0] == '/' and data_value[1] == '/':
00234                         data_value = data_value[1:]
00235 
00236                 return data_value
00237 
00238 
00239         # ROSUNIT callbacks
00240 
00241         def _test_output(self, value, expected):
00242                 def _test_call(test_self):
00243                         test_self.assertEquals(value, expected, "Output value %s does not match expected %s" % (value, expected))
00244                 return _test_call
00245 
00246         def _test_outcome(self, outcome, expected):
00247                 def _test_call(test_self):
00248                         test_self.assertEquals(outcome, expected, "Outcome %s does not match expected %s" % (outcome, expected))
00249                 return _test_call
00250 
00251         def _test_pass(self, passed):
00252                 def _test_call(test_self):
00253                         test_self.assertTrue(passed, "State did not pass flexbe tests.")
00254                 return _test_call


flexbe_testing
Author(s): Philipp Schillinger
autogenerated on Thu Jun 6 2019 19:32:31