counterbalance_analysis.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Software License Agreement (BSD License)
00004 #
00005 # Copyright (c) 2010, Willow Garage, Inc.
00006 # All rights reserved.
00007 #
00008 # Redistribution and use in source and binary forms, with or without
00009 # modification, are permitted provided that the following conditions
00010 # are met:
00011 #
00012 #  * Redistributions of source code must retain the above copyright
00013 #    notice, this list of conditions and the following disclaimer.
00014 #  * Redistributions in binary form must reproduce the above
00015 #    copyright notice, this list of conditions and the following
00016 #    disclaimer in the documentation and/or other materials provided
00017 #    with the distribution.
00018 #  * Neither the name of the Willow Garage nor the names of its
00019 #    contributors may be used to endorse or promote products derived
00020 #    from this software without specific prior written permission.
00021 #
00022 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033 # POSSIBILITY OF SUCH DAMAGE.
00034 
00035 ##\author Kevin Watts
00036 ##\brief Analyzes counterbalance data
00037 
00038 PKG = 'pr2_counterbalance_check'
00039 import roslib; roslib.load_manifest(PKG)
00040 
00041 import numpy
00042 import math
00043 import sys
00044 
00045 
00046 import matplotlib
00047 matplotlib.use('Agg')
00048 import matplotlib.pyplot as plt
00049 from StringIO import StringIO
00050 
00051 from pr2_self_test_msgs.msg import Plot, TestValue, TestParam
00052 
00053 ok_dict = { False: 'FAIL', True: 'OK' }
00054 
00055 class JointPositionAnalysisData(object):
00056     def __init__(self, msg):
00057         self.time     = numpy.array(msg.time)
00058         self.position = numpy.array(msg.position)
00059         self.velocity = numpy.array(msg.velocity)
00060         self.effort   = numpy.array(msg.effort)
00061 
00062         self.position_avg = numpy.average(self.position)
00063         self.position_sd  = numpy.std(self.position)
00064         self.effort_avg   = numpy.average(self.effort)
00065         self.effort_sd    = numpy.std(self.effort)
00066 
00067 class CBPositionAnalysisData(object):
00068     def __init__(self, msg):
00069         self.flex_position = msg.flex_position
00070         self.lift_hold = JointPositionAnalysisData(msg.lift_hold)
00071         self.flex_hold = JointPositionAnalysisData(msg.flex_hold)
00072 
00073 class CBRunAnalysisData(object):
00074     def __init__(self, msg):
00075         self.lift_position = msg.lift_position
00076         self.flex_data = []
00077         for fd in msg.flex_data:
00078             self.flex_data.append(CBPositionAnalysisData(fd))
00079 
00080 class CounterbalanceAnalysisData(object):
00081     ##\param msg CounterbalanceTestData : Message from controller
00082     def __init__(self, msg):
00083         self.lift_data = []
00084         for ld in msg.lift_data:
00085             self.lift_data.append(CBRunAnalysisData(ld))
00086 
00087 ##\brief Stores parameters from CB analysis test
00088 class CounterbalanceAnalysisParams(object):
00089     def __init__(self, msg):
00090         self.lift_dither  = msg.lift_amplitude
00091         self.flex_dither  = msg.flex_amplitude
00092         
00093         self.lift_joint   = msg.lift_joint
00094         self.flex_joint   = msg.flex_joint
00095         
00096         self.timeout_hit  = msg.timeout_hit
00097         self.flex_test    = msg.flex_test
00098 
00099         self.lift_mse     = msg.arg_value[9]
00100         self.lift_avg_abs = msg.arg_value[10]
00101         self.lift_avg_eff = msg.arg_value[11]
00102         self.flex_mse     = msg.arg_value[12]
00103         self.flex_avg_abs = msg.arg_value[13]
00104         self.flex_avg_eff = msg.arg_value[14]
00105 
00106         self.lift_p       = msg.arg_value[15]
00107         self.lift_i       = msg.arg_value[16]
00108         self.lift_d       = msg.arg_value[17]
00109         self.lift_i_clamp = msg.arg_value[18]
00110 
00111         self.flex_p       = msg.arg_value[19]
00112         self.flex_i       = msg.arg_value[20]
00113         self.flex_d       = msg.arg_value[21]
00114         self.flex_i_clamp = msg.arg_value[22]
00115 
00116         if len(msg.arg_value) > 24:
00117             self.screw_tol = msg.arg_value[23]
00118             self.bar_tol   = msg.arg_value[24]
00119         else:
00120             # For backwards compatibility
00121             self.screw_tol = 2.0
00122             self.bar_tol   = 0.8
00123 
00124         self.num_flexes = len(msg.lift_data[0].flex_data)
00125         self.num_lifts  = len(msg.lift_data)
00126         
00127         self.min_lift = msg.lift_data[0].lift_position
00128         self.max_lift = msg.lift_data[-1].lift_position
00129 
00130         self.min_flex = msg.lift_data[0].flex_data[0].flex_position
00131         self.max_flex = msg.lift_data[0].flex_data[-1].flex_position
00132 
00133         self.named_params = {}
00134         for i in range(0, 9):
00135             self.named_params[msg.arg_name[i]] = msg.arg_value[i]
00136 
00137     def get_test_params(self):
00138         params = []
00139         params.append(TestParam(key='Lift Dither', value=str(self.lift_dither)))
00140         params.append(TestParam(key='Flex Dither', value=str(self.flex_dither)))
00141         params.append(TestParam(key='Lift Joint', value=self.lift_joint))
00142         params.append(TestParam(key='Flex Joint', value=self.flex_joint))
00143         params.append(TestParam(key='Timeout Hit', value=ok_dict[not self.timeout_hit]))
00144         params.append(TestParam(key='Flex Tested', value=str(self.flex_test)))
00145 
00146         params.append(TestParam(key='Lift MSE', value=str(self.lift_mse)))
00147         params.append(TestParam(key='Lift Avg Abs', value=str(self.lift_avg_abs)))
00148         params.append(TestParam(key='Lift Avg Effort', value=str(self.lift_avg_eff)))
00149 
00150 
00151         params.append(TestParam(key='Lift P Gain', value=str(self.lift_p)))
00152         params.append(TestParam(key='Lift I Gain', value=str(self.lift_i)))
00153         params.append(TestParam(key='Lift D Gain', value=str(self.lift_d)))
00154         params.append(TestParam(key='Lift I Clamp', value=str(self.lift_i_clamp)))
00155 
00156         params.append(TestParam(key='Num Lifts', value=str(self.num_lifts)))
00157         params.append(TestParam(key='Min Lift', value="%.2f" % self.min_lift))
00158         params.append(TestParam(key='Max Lift', value="%.2f" % self.max_lift))
00159 
00160         if self.flex_test:
00161             params.append(TestParam(key='Flex MSE', value=str(self.flex_mse)))
00162             params.append(TestParam(key='Flex Avg Abs', value=str(self.flex_avg_abs)))
00163             params.append(TestParam(key='Flex Avg Effort', value=str(self.flex_avg_eff)))
00164             params.append(TestParam(key='Flex P Gain', value=str(self.flex_p)))
00165             params.append(TestParam(key='Flex I Gain', value=str(self.flex_i)))
00166             params.append(TestParam(key='Flex D Gain', value=str(self.flex_d)))
00167             params.append(TestParam(key='Flex I Clamp', value=str(self.flex_i_clamp)))
00168             params.append(TestParam(key='Num Flexes', value=str(self.num_flexes)))
00169             params.append(TestParam(key='Min Flex', value="%.2f" % self.min_flex))
00170             params.append(TestParam(key='Max Flex', value="%.2f" % self.max_flex))
00171 
00172         for key, val in self.named_params.iteritems():
00173             params.append(TestParam(key=key, value=str(val)))
00174             
00175 
00176         return params
00177 
00178 class CounterbalanceAnalysisResult:
00179     __slots__ = ['html', 'summary', 'result', 'values']
00180     def __init__(self):
00181         self.html = ''
00182         self.summary = ''
00183         self.result = False
00184         self.values = []
00185 
00186 ##\brief Get average efforts for CB test as a list
00187 ##
00188 ##\param lift_calc bool : Lift or flex efforts
00189 def get_efforts(data, lift_calc):
00190     avg_effort_list = []
00191     for ld in data.lift_data:
00192         for fd in ld.flex_data:
00193             if lift_calc:
00194                 avg_effort_list.append(fd.lift_hold.effort_avg)
00195             else:
00196                 avg_effort_list.append(fd.flex_hold.effort_avg)
00197 
00198     return avg_effort_list
00199 
00200 def _get_mean_sq_effort(avg_effort_array):
00201     sq_array = avg_effort_array * avg_effort_array
00202     return numpy.average(sq_array)
00203 
00204 def _get_mean_abs_effort(avg_effort_array):
00205     abs_array = abs(avg_effort_array)
00206     return numpy.average(abs_array)
00207 
00208 def _get_mean_effort(avg_effort_array):
00209     return numpy.average(avg_effort_array)
00210 
00211 ##\brief Returns a list of lift positions, efforts for a given flex position (vary by lift)
00212 def _get_const_flex_effort(data, flex_index = 0, lift_calc = True):
00213     effort_list = []
00214     lift_list = []
00215     for ld in data.lift_data:
00216         fd = ld.flex_data[flex_index]
00217         lift_list.append(ld.lift_position)
00218         if lift_calc:
00219             effort_list.append(fd.lift_hold.effort_avg)
00220         else:
00221             effort_list.append(fd.flex_hold.effort_avg)
00222 
00223     return lift_list, effort_list
00224     
00225 def _get_const_lift_effort(data, lift_index = 0, lift_calc = True):
00226     ld = data.lift_data[lift_index]
00227         
00228     effort_list = []
00229     flex_list = []
00230 
00231     for fd in ld.flex_data:
00232         flex_list.append(fd.flex_position)
00233         if lift_calc:
00234             effort_list.append(fd.lift_hold.effort_avg)
00235         else:
00236             effort_list.append(fd.flex_hold.effort_avg)
00237 
00238     return flex_list, effort_list
00239 
00240 
00241 
00242 def _get_flex_positions(data):
00243     flex_list = []
00244     for fd in data.lift_data[0].flex_data:
00245         flex_list.append(fd.flex_position)
00246 
00247     return flex_list
00248 
00249 def _get_lift_positions(data):
00250     lifts = []
00251     for ld in data.lift_data:
00252         lifts.append(ld.lift_position)
00253     return lifts
00254 
00255 
00256 ##\brief Gives effort contour plot of efforts by lift, flex position
00257 ##
00258 ##\param params CounterbalanceAnalysisParams : Input params
00259 ##\param data CounterbalanceAnalysisData : Test Data
00260 ##\return qualification.msg.Plot : Plot message with contour
00261 def plot_effort_contour(params, data, lift_calc = True):
00262     effort_list = []
00263     for i in range(0, params.num_lifts):
00264         flexes, efforts = _get_const_lift_effort(data, i, lift_calc)
00265         effort_list.append(efforts)
00266     flexes = _get_flex_positions(data)
00267     lifts = _get_lift_positions(data)
00268 
00269     flex_grid, lift_grid = numpy.meshgrid(numpy.array(flexes), numpy.array(lifts))
00270     effort_grid = numpy.array(effort_list)
00271 
00272     CS = plt.contour(flex_grid, lift_grid, effort_grid)
00273     plt.clabel(CS, inline=0, fontsize=10)
00274     
00275     plt.xlabel('Flex')
00276     plt.ylabel('Lift')
00277         
00278     stream = StringIO()
00279     plt.savefig(stream, format = 'png')
00280     image = stream.getvalue()
00281     p = Plot()
00282     if lift_calc:
00283         p.title = 'lift_effort_contour'
00284     else:
00285         p.title = 'flex_effort_contour'
00286     p.image = image
00287     p.image_format = 'png'
00288     
00289     plt.close()
00290 
00291     return p
00292 
00293 ##\brief Plots CB efforts against shoulder lift position
00294 ##
00295 ##\param flex_index int : Index of flex data to plot against
00296 ##\param lift_calc bool : Lift efforts or flex efforts
00297 def plot_efforts_by_lift_position(params, data, flex_index = -1, lift_calc = True):
00298     lift_position, effort = _get_const_flex_effort(data, flex_index, lift_calc)
00299     
00300     flex_position = data.lift_data[0].flex_data[flex_index].flex_position
00301 
00302     plt.plot(numpy.array(lift_position), numpy.array(effort))
00303     if lift_calc:
00304         plt.title('Shoulder Lift Effort at Flex Position %.2f' % (flex_position))
00305     else:
00306         plt.title('Shoulder Flex Effort at Flex Position %.2f' % (flex_position))
00307     plt.axes()
00308     plt.xlabel('Lift Position')
00309     plt.ylabel('Effort')
00310     plt.axhline(y = 0, color = 'r', label='_nolegend_')
00311 
00312     stream = StringIO()
00313     plt.savefig(stream, format = 'png')
00314     image = stream.getvalue()
00315     p = Plot()
00316     if lift_calc:
00317         p.title = 'lift_effort_const_flex_%d' % flex_index
00318     else:
00319         p.title = 'flex_effort_const_flex_%d' % flex_index
00320     p.image = image
00321     p.image_format = 'png'
00322     
00323     plt.close()
00324 
00325     return p
00326 
00327 ##\brief Checks shoulder lift efforts against params
00328 ##
00329 ##\return CounterbalanceAnalysisResult
00330 def analyze_lift_efforts(params, data):
00331     result = CounterbalanceAnalysisResult()
00332     
00333     avg_efforts = numpy.array(get_efforts(data, True))
00334     mse = _get_mean_sq_effort(avg_efforts)
00335     avg_abs = _get_mean_abs_effort(avg_efforts)
00336     avg_eff = _get_mean_effort(avg_efforts)
00337 
00338     mse_ok = mse < params.lift_mse
00339     avg_abs_ok = avg_abs < params.lift_avg_abs
00340     avg_eff_ok = abs(avg_eff) < abs(params.lift_avg_eff)
00341 
00342     if mse_ok and avg_abs_ok:
00343         result.summary = 'Lift efforts OK'
00344     elif avg_eff < 0:
00345         result.summary = 'Counterbalance is too weak. Requires adjustment'
00346     else:
00347         result.summary = 'Counterbalance is too strong. Requires adjustment'
00348 
00349     html = ['<p>%s</p>' % result.summary]
00350     
00351     html.append('<table border="1" cellpadding="2" cellspacing="0">')
00352     html.append('<tr><td><b>Parameter</b></td><td><b>Value</b></td><td><b>Maximum</b></td><td><b>Status</b></td></tr>')
00353     html.append('<tr><td><b>Mean Sq. Effort</b></td><td>%.2f</td><td>%.2f</td><td>%s</td></tr>' % (mse, params.lift_mse, ok_dict[mse_ok]))
00354     html.append('<tr><td><b>Average Abs. Effort</b></td><td>%.2f</td><td>%.2f</td><td>%s</td></tr>' % (avg_abs, params.lift_avg_abs, ok_dict[avg_abs_ok]))
00355     html.append('<tr><td><b>Average Effort</b></td><td>%.2f</td><td>%.2f</td><td>%s</td></tr>' % (avg_eff, params.lift_avg_eff, ok_dict[avg_eff_ok]))
00356     html.append('</table>')
00357 
00358     result.html = '\n'.join(html)
00359 
00360     result.result = mse_ok and avg_abs_ok
00361     
00362     result.values = []
00363     result.values.append(TestValue('Lift MSE', str(mse), '', str(params.lift_mse)))
00364     result.values.append(TestValue('Lift Avg. Abs. Effort', str(avg_abs), '', str(params.lift_avg_abs)))
00365     result.values.append(TestValue('Lift Avg.  Effort', str(avg_eff), '', str(params.lift_avg_eff)))
00366 
00367     return result
00368     
00369 ##\brief Checks shoulder flex efforts against params
00370 ##
00371 ##\return CounterbalanceAnalysisResult
00372 def analyze_flex_efforts(params, data):
00373     result = CounterbalanceAnalysisResult()
00374     
00375     avg_efforts = numpy.array(get_efforts(data, False))
00376     mse = _get_mean_sq_effort(avg_efforts)
00377     avg_abs = _get_mean_abs_effort(avg_efforts)
00378     avg_eff = _get_mean_effort(avg_efforts)
00379 
00380     mse_ok = mse < params.flex_mse
00381     avg_abs_ok = avg_abs < params.flex_avg_abs
00382     avg_eff_ok = abs(avg_eff) < abs(params.flex_avg_eff)
00383 
00384     if mse_ok and avg_abs_ok:
00385         result.summary = 'Flex efforts OK'
00386     else:
00387         result.summary = 'Flex MSE/Avg. Absolute effort too high'
00388 
00389     html = ['<p>%s</p>' % result.summary]
00390     
00391     html.append('<table border="1" cellpadding="2" cellspacing="0">')
00392     html.append('<tr><td><b>Parameter</b></td><td><b>Value</b></td><td><b>Maximum</b></td><td><b>Status</b></td></tr>')
00393     html.append('<tr><td><b>Mean Sq. Effort</b></td><td>%.2f</td><td>%.2f</td><td>%s</td></tr>' % (mse, params.flex_mse, ok_dict[mse_ok]))
00394     html.append('<tr><td><b>Average Abs. Effort</b></td><td>%.2f</td><td>%.2f</td><td>%s</td></tr>' % (avg_abs, params.flex_avg_abs, ok_dict[avg_abs_ok]))
00395     html.append('<tr><td><b>Average Effort</b></td><td>%.2f</td><td>%.2f</td><td>%s</td></tr>' % (avg_eff, params.flex_avg_eff, ok_dict[avg_eff_ok]))
00396     html.append('</table>')
00397 
00398     result.html = '\n'.join(html)
00399 
00400     result.result = mse_ok and avg_abs_ok
00401     
00402     result.values = []
00403     result.values.append(TestValue('Flex MSE', str(mse), '', str(params.flex_mse)))
00404     result.values.append(TestValue('Flex Avg. Abs. Effort', str(avg_abs), '', str(params.flex_avg_abs)))
00405     result.values.append(TestValue('Flex Avg.  Effort', str(avg_eff), '', str(params.flex_avg_eff)))
00406 
00407     return result
00408 
00409 ##\brief Calculates CB adjustment 
00410 ##
00411 ##\return (secondary, arm_gimbal) : Turns CW
00412 def calc_cb_adjust(data, model_file):
00413     try:
00414         # Use this to tune to last known "good" position
00415         # A = numpy.load(model_file).transpose()
00416         
00417         # This uses minimum of total torque
00418         A = numpy.load(model_file)[:-1].transpose() 
00419         B = numpy.array(get_efforts(data, True) + get_efforts(data, False))
00420         X = numpy.linalg.lstsq(A,B)
00421     except:
00422         print >> sys.stderr, "Unable to calculate CB adjustment. May have incorrect model data"
00423         import traceback
00424         traceback.print_exc()
00425         return (100, 100)
00426 
00427     # Recommended adjustments
00428     secondary = -X[0][0]
00429     cb_bar = X[0][1] # CCW increases force
00430 
00431     return (secondary, cb_bar)
00432 
00433 ##\brief Return CB adjustments to minimize total torque
00434 ##
00435 ## Uses CB adjustment/torque value from the model file. 
00436 ## Model file is generated by 'test_pr2_self_test/counterbalance_training.py'
00437 ##\param params CounterbalanceAnalysisParams
00438 ##\param data CounterbalanceAnalysisData
00439 ##\param str : Filename of model file
00440 def check_cb_adjustment(params, data, model_file):
00441     result = CounterbalanceAnalysisResult()
00442 
00443     (secondary, cb_bar) = calc_cb_adjust(data, model_file)
00444 
00445     if (abs(secondary) > 25 or abs(cb_bar) > 25):
00446         result.result = False
00447         result.summary = 'Unable to calculate CB adjustment. Data may be corrupt or CB may be too far off'
00448         result.html = '<p>Unable to calculate CB adjustment.</p>' 
00449         return result
00450 
00451     secondary_dir = 'CW' if secondary > 0 else 'CCW' 
00452     cb_bar_dir = 'CW' if cb_bar > 0 else 'CCW'
00453     
00454     adjust_msg = '<table border="1" cellpadding="2" cellspacing="0">'
00455     adjust_msg += '<tr><td><b>Adjustment</b></td><td><b>Turns</b></td><td><b>Direction</b></td><td><b>Allowable Tolerance</b></td></tr>\n'
00456     adjust_msg += '<tr><td>Secondary Spring</td><td>%.1f</td><td>%s</td><td>%.1f</td></tr>\n' % (abs(secondary), secondary_dir, params.screw_tol)
00457     adjust_msg += '<tr><td>Arm Gimbal Shaft</td><td>%.1f</td><td>%s</td><td>%.1f</td></tr>\n' % (abs(cb_bar), cb_bar_dir, params.bar_tol)
00458     adjust_msg += '</table>\n'
00459 
00460     
00461     if abs(secondary) > params.screw_tol or abs(cb_bar) > params.bar_tol:
00462         result.result = False
00463         result.summary = 'CB adjustment recommended. Follow instructions below to tune CB. '
00464         result.html = '<p>CB adjustment recommended. Adjusting the counterbalance will increase performance of the arm. (Note: CW = "Clockwise")</p>\n<p>%s</p>\n' % adjust_msg
00465     else:
00466         result.result = True
00467         result.summary = ''
00468         result.html = '<p>No CB adjustment recommended. You may choose to adjust the CB using the following instructions, but this is within tolerance.</p>\n<p>%s</p>' % adjust_msg
00469 
00470 
00471     result.values = [TestValue('Secondary Spring Adjustment', str(secondary), '', str(params.screw_tol)),
00472                      TestValue('CB Bar Adjustment', str(cb_bar), '', str(params.bar_tol))]
00473 
00474     return result


pr2_counterbalance_check
Author(s): Kevin Watts
autogenerated on Sat Dec 28 2013 17:30:27