counterbalance_analysis.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Software License Agreement (BSD License)
00004 #
00005 # Copyright (c) 2010, Willow Garage, Inc.
00006 # All rights reserved.
00007 #
00008 # Redistribution and use in source and binary forms, with or without
00009 # modification, are permitted provided that the following conditions
00010 # are met:
00011 #
00012 #  * Redistributions of source code must retain the above copyright
00013 #    notice, this list of conditions and the following disclaimer.
00014 #  * Redistributions in binary form must reproduce the above
00015 #    copyright notice, this list of conditions and the following
00016 #    disclaimer in the documentation and/or other materials provided
00017 #    with the distribution.
00018 #  * Neither the name of the Willow Garage nor the names of its
00019 #    contributors may be used to endorse or promote products derived
00020 #    from this software without specific prior written permission.
00021 #
00022 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033 # POSSIBILITY OF SUCH DAMAGE.
00034 
00035 ##\author Kevin Watts
00036 ##\brief Analyzes counterbalance data
00037 
00038 PKG = 'pr2_counterbalance_check'
00039 import roslib
00040 #import roslib; roslib.load_manifest(PKG)
00041 
00042 import numpy
00043 import math
00044 import sys
00045 
00046 
00047 import matplotlib
00048 matplotlib.use('Agg')
00049 import matplotlib.pyplot as plt
00050 from StringIO import StringIO
00051 
00052 from pr2_self_test_msgs.msg import Plot, TestValue, TestParam
00053 
00054 ok_dict = { False: 'FAIL', True: 'OK' }
00055 
00056 def str_to_bytes(s):
00057        return map(lambda x: x if x < 128 else x-256, map(ord, s))
00058 
00059 class JointPositionAnalysisData(object):
00060     def __init__(self, msg):
00061         self.time     = numpy.array(msg.time)
00062         self.position = numpy.array(msg.position)
00063         self.velocity = numpy.array(msg.velocity)
00064         self.effort   = numpy.array(msg.effort)
00065 
00066         self.position_avg = numpy.average(self.position)
00067         self.position_sd  = numpy.std(self.position)
00068         self.effort_avg   = numpy.average(self.effort)
00069         self.effort_sd    = numpy.std(self.effort)
00070 
00071 class CBPositionAnalysisData(object):
00072     def __init__(self, msg):
00073         self.flex_position = msg.flex_position
00074         self.lift_hold = JointPositionAnalysisData(msg.lift_hold)
00075         self.flex_hold = JointPositionAnalysisData(msg.flex_hold)
00076 
00077 class CBRunAnalysisData(object):
00078     def __init__(self, msg):
00079         self.lift_position = msg.lift_position
00080         self.flex_data = []
00081         for fd in msg.flex_data:
00082             self.flex_data.append(CBPositionAnalysisData(fd))
00083 
00084 class CounterbalanceAnalysisData(object):
00085     ##\param msg CounterbalanceTestData : Message from controller
00086     def __init__(self, msg):
00087         self.lift_data = []
00088         for ld in msg.lift_data:
00089             self.lift_data.append(CBRunAnalysisData(ld))
00090 
00091 ##\brief Stores parameters from CB analysis test
00092 class CounterbalanceAnalysisParams(object):
00093     def __init__(self, msg):
00094         self.lift_dither  = msg.lift_amplitude
00095         self.flex_dither  = msg.flex_amplitude
00096         
00097         self.lift_joint   = msg.lift_joint
00098         self.flex_joint   = msg.flex_joint
00099         
00100         self.timeout_hit  = msg.timeout_hit
00101         self.flex_test    = msg.flex_test
00102 
00103         self.lift_mse     = msg.arg_value[9]
00104         self.lift_avg_abs = msg.arg_value[10]
00105         self.lift_avg_eff = msg.arg_value[11]
00106         self.flex_mse     = msg.arg_value[12]
00107         self.flex_avg_abs = msg.arg_value[13]
00108         self.flex_avg_eff = msg.arg_value[14]
00109 
00110         self.lift_p       = msg.arg_value[15]
00111         self.lift_i       = msg.arg_value[16]
00112         self.lift_d       = msg.arg_value[17]
00113         self.lift_i_clamp = msg.arg_value[18]
00114 
00115         self.flex_p       = msg.arg_value[19]
00116         self.flex_i       = msg.arg_value[20]
00117         self.flex_d       = msg.arg_value[21]
00118         self.flex_i_clamp = msg.arg_value[22]
00119 
00120         if len(msg.arg_value) > 24:
00121             self.screw_tol = msg.arg_value[23]
00122             self.bar_tol   = msg.arg_value[24]
00123         else:
00124             # For backwards compatibility
00125             self.screw_tol = 2.0
00126             self.bar_tol   = 0.8
00127 
00128         self.num_flexes = len(msg.lift_data[0].flex_data)
00129         self.num_lifts  = len(msg.lift_data)
00130         
00131         self.min_lift = msg.lift_data[0].lift_position
00132         self.max_lift = msg.lift_data[-1].lift_position
00133 
00134         self.min_flex = msg.lift_data[0].flex_data[0].flex_position
00135         self.max_flex = msg.lift_data[0].flex_data[-1].flex_position
00136 
00137         self.named_params = {}
00138         for i in range(0, 9):
00139             self.named_params[msg.arg_name[i]] = msg.arg_value[i]
00140 
00141     def get_test_params(self):
00142         params = []
00143         params.append(TestParam(key='Lift Dither', value=str(self.lift_dither)))
00144         params.append(TestParam(key='Flex Dither', value=str(self.flex_dither)))
00145         params.append(TestParam(key='Lift Joint', value=self.lift_joint))
00146         params.append(TestParam(key='Flex Joint', value=self.flex_joint))
00147         params.append(TestParam(key='Timeout Hit', value=ok_dict[not self.timeout_hit]))
00148         params.append(TestParam(key='Flex Tested', value=str(self.flex_test)))
00149 
00150         params.append(TestParam(key='Lift MSE', value=str(self.lift_mse)))
00151         params.append(TestParam(key='Lift Avg Abs', value=str(self.lift_avg_abs)))
00152         params.append(TestParam(key='Lift Avg Effort', value=str(self.lift_avg_eff)))
00153 
00154 
00155         params.append(TestParam(key='Lift P Gain', value=str(self.lift_p)))
00156         params.append(TestParam(key='Lift I Gain', value=str(self.lift_i)))
00157         params.append(TestParam(key='Lift D Gain', value=str(self.lift_d)))
00158         params.append(TestParam(key='Lift I Clamp', value=str(self.lift_i_clamp)))
00159 
00160         params.append(TestParam(key='Num Lifts', value=str(self.num_lifts)))
00161         params.append(TestParam(key='Min Lift', value="%.2f" % self.min_lift))
00162         params.append(TestParam(key='Max Lift', value="%.2f" % self.max_lift))
00163 
00164         if self.flex_test:
00165             params.append(TestParam(key='Flex MSE', value=str(self.flex_mse)))
00166             params.append(TestParam(key='Flex Avg Abs', value=str(self.flex_avg_abs)))
00167             params.append(TestParam(key='Flex Avg Effort', value=str(self.flex_avg_eff)))
00168             params.append(TestParam(key='Flex P Gain', value=str(self.flex_p)))
00169             params.append(TestParam(key='Flex I Gain', value=str(self.flex_i)))
00170             params.append(TestParam(key='Flex D Gain', value=str(self.flex_d)))
00171             params.append(TestParam(key='Flex I Clamp', value=str(self.flex_i_clamp)))
00172             params.append(TestParam(key='Num Flexes', value=str(self.num_flexes)))
00173             params.append(TestParam(key='Min Flex', value="%.2f" % self.min_flex))
00174             params.append(TestParam(key='Max Flex', value="%.2f" % self.max_flex))
00175 
00176         for key, val in self.named_params.iteritems():
00177             params.append(TestParam(key=key, value=str(val)))
00178             
00179 
00180         return params
00181 
00182 class CounterbalanceAnalysisResult:
00183     __slots__ = ['html', 'summary', 'result', 'values']
00184     def __init__(self):
00185         self.html = ''
00186         self.summary = ''
00187         self.result = False
00188         self.values = []
00189 
00190 ##\brief Get average efforts for CB test as a list
00191 ##
00192 ##\param lift_calc bool : Lift or flex efforts
00193 def get_efforts(data, lift_calc):
00194     avg_effort_list = []
00195     for ld in data.lift_data:
00196         for fd in ld.flex_data:
00197             if lift_calc:
00198                 avg_effort_list.append(fd.lift_hold.effort_avg)
00199             else:
00200                 avg_effort_list.append(fd.flex_hold.effort_avg)
00201 
00202     return avg_effort_list
00203 
00204 def _get_mean_sq_effort(avg_effort_array):
00205     sq_array = avg_effort_array * avg_effort_array
00206     return numpy.average(sq_array)
00207 
00208 def _get_mean_abs_effort(avg_effort_array):
00209     abs_array = abs(avg_effort_array)
00210     return numpy.average(abs_array)
00211 
00212 def _get_mean_effort(avg_effort_array):
00213     return numpy.average(avg_effort_array)
00214 
00215 ##\brief Returns a list of lift positions, efforts for a given flex position (vary by lift)
00216 def _get_const_flex_effort(data, flex_index = 0, lift_calc = True):
00217     effort_list = []
00218     lift_list = []
00219     for ld in data.lift_data:
00220         fd = ld.flex_data[flex_index]
00221         lift_list.append(ld.lift_position)
00222         if lift_calc:
00223             effort_list.append(fd.lift_hold.effort_avg)
00224         else:
00225             effort_list.append(fd.flex_hold.effort_avg)
00226 
00227     return lift_list, effort_list
00228     
00229 def _get_const_lift_effort(data, lift_index = 0, lift_calc = True):
00230     ld = data.lift_data[lift_index]
00231         
00232     effort_list = []
00233     flex_list = []
00234 
00235     for fd in ld.flex_data:
00236         flex_list.append(fd.flex_position)
00237         if lift_calc:
00238             effort_list.append(fd.lift_hold.effort_avg)
00239         else:
00240             effort_list.append(fd.flex_hold.effort_avg)
00241 
00242     return flex_list, effort_list
00243 
00244 
00245 
00246 def _get_flex_positions(data):
00247     flex_list = []
00248     for fd in data.lift_data[0].flex_data:
00249         flex_list.append(fd.flex_position)
00250 
00251     return flex_list
00252 
00253 def _get_lift_positions(data):
00254     lifts = []
00255     for ld in data.lift_data:
00256         lifts.append(ld.lift_position)
00257     return lifts
00258 
00259 
00260 ##\brief Gives effort contour plot of efforts by lift, flex position
00261 ##
00262 ##\param params CounterbalanceAnalysisParams : Input params
00263 ##\param data CounterbalanceAnalysisData : Test Data
00264 ##\return qualification.msg.Plot : Plot message with contour
00265 def plot_effort_contour(params, data, lift_calc = True):
00266     effort_list = []
00267     for i in range(0, params.num_lifts):
00268         flexes, efforts = _get_const_lift_effort(data, i, lift_calc)
00269         effort_list.append(efforts)
00270     flexes = _get_flex_positions(data)
00271     lifts = _get_lift_positions(data)
00272 
00273     flex_grid, lift_grid = numpy.meshgrid(numpy.array(flexes), numpy.array(lifts))
00274     effort_grid = numpy.array(effort_list)
00275 
00276     CS = plt.contour(flex_grid, lift_grid, effort_grid)
00277     plt.clabel(CS, inline=0, fontsize=10)
00278     
00279     plt.xlabel('Flex')
00280     plt.ylabel('Lift')
00281         
00282     stream = StringIO()
00283     plt.savefig(stream, format = 'png')
00284     image = stream.getvalue()
00285     p = Plot()
00286     if lift_calc:
00287         p.title = 'lift_effort_contour'
00288     else:
00289         p.title = 'flex_effort_contour'
00290     p.image = str_to_bytes(image)
00291     p.image_format = 'png'
00292     
00293     plt.close()
00294 
00295     return p
00296 
00297 ##\brief Plots CB efforts against shoulder lift position
00298 ##
00299 ##\param flex_index int : Index of flex data to plot against
00300 ##\param lift_calc bool : Lift efforts or flex efforts
00301 def plot_efforts_by_lift_position(params, data, flex_index = -1, lift_calc = True):
00302     lift_position, effort = _get_const_flex_effort(data, flex_index, lift_calc)
00303     
00304     flex_position = data.lift_data[0].flex_data[flex_index].flex_position
00305 
00306     plt.plot(numpy.array(lift_position), numpy.array(effort))
00307     if lift_calc:
00308         plt.title('Shoulder Lift Effort at Flex Position %.2f' % (flex_position))
00309     else:
00310         plt.title('Shoulder Flex Effort at Flex Position %.2f' % (flex_position))
00311     plt.axes()
00312     plt.xlabel('Lift Position')
00313     plt.ylabel('Effort')
00314     plt.axhline(y = 0, color = 'r', label='_nolegend_')
00315 
00316     stream = StringIO()
00317     plt.savefig(stream, format = 'png')
00318     image = stream.getvalue()
00319     p = Plot()
00320     if lift_calc:
00321         p.title = 'lift_effort_const_flex_%d' % flex_index
00322     else:
00323         p.title = 'flex_effort_const_flex_%d' % flex_index
00324     p.image = str_to_bytes(image)
00325     p.image_format = 'png'
00326     
00327     plt.close()
00328 
00329     return p
00330 
00331 ##\brief Checks shoulder lift efforts against params
00332 ##
00333 ##\return CounterbalanceAnalysisResult
00334 def analyze_lift_efforts(params, data):
00335     result = CounterbalanceAnalysisResult()
00336     
00337     avg_efforts = numpy.array(get_efforts(data, True))
00338     mse = _get_mean_sq_effort(avg_efforts)
00339     avg_abs = _get_mean_abs_effort(avg_efforts)
00340     avg_eff = _get_mean_effort(avg_efforts)
00341 
00342     mse_ok = mse < params.lift_mse
00343     avg_abs_ok = avg_abs < params.lift_avg_abs
00344     avg_eff_ok = abs(avg_eff) < abs(params.lift_avg_eff)
00345 
00346     if mse_ok and avg_abs_ok:
00347         result.summary = 'Lift efforts OK'
00348     elif avg_eff < 0:
00349         result.summary = 'Counterbalance is too weak. Requires adjustment'
00350     else:
00351         result.summary = 'Counterbalance is too strong. Requires adjustment'
00352 
00353     html = ['<p>%s</p>' % result.summary]
00354     
00355     html.append('<table border="1" cellpadding="2" cellspacing="0">')
00356     html.append('<tr><td><b>Parameter</b></td><td><b>Value</b></td><td><b>Maximum</b></td><td><b>Status</b></td></tr>')
00357     html.append('<tr><td><b>Mean Sq. Effort</b></td><td>%.2f</td><td>%.2f</td><td>%s</td></tr>' % (mse, params.lift_mse, ok_dict[mse_ok]))
00358     html.append('<tr><td><b>Average Abs. Effort</b></td><td>%.2f</td><td>%.2f</td><td>%s</td></tr>' % (avg_abs, params.lift_avg_abs, ok_dict[avg_abs_ok]))
00359     html.append('<tr><td><b>Average Effort</b></td><td>%.2f</td><td>%.2f</td><td>%s</td></tr>' % (avg_eff, params.lift_avg_eff, ok_dict[avg_eff_ok]))
00360     html.append('</table>')
00361 
00362     result.html = '\n'.join(html)
00363 
00364     result.result = mse_ok and avg_abs_ok
00365     
00366     result.values = []
00367     result.values.append(TestValue('Lift MSE', str(mse), '', str(params.lift_mse)))
00368     result.values.append(TestValue('Lift Avg. Abs. Effort', str(avg_abs), '', str(params.lift_avg_abs)))
00369     result.values.append(TestValue('Lift Avg.  Effort', str(avg_eff), '', str(params.lift_avg_eff)))
00370 
00371     return result
00372     
00373 ##\brief Checks shoulder flex efforts against params
00374 ##
00375 ##\return CounterbalanceAnalysisResult
00376 def analyze_flex_efforts(params, data):
00377     result = CounterbalanceAnalysisResult()
00378     
00379     avg_efforts = numpy.array(get_efforts(data, False))
00380     mse = _get_mean_sq_effort(avg_efforts)
00381     avg_abs = _get_mean_abs_effort(avg_efforts)
00382     avg_eff = _get_mean_effort(avg_efforts)
00383 
00384     mse_ok = mse < params.flex_mse
00385     avg_abs_ok = avg_abs < params.flex_avg_abs
00386     avg_eff_ok = abs(avg_eff) < abs(params.flex_avg_eff)
00387 
00388     if mse_ok and avg_abs_ok:
00389         result.summary = 'Flex efforts OK'
00390     else:
00391         result.summary = 'Flex MSE/Avg. Absolute effort too high'
00392 
00393     html = ['<p>%s</p>' % result.summary]
00394     
00395     html.append('<table border="1" cellpadding="2" cellspacing="0">')
00396     html.append('<tr><td><b>Parameter</b></td><td><b>Value</b></td><td><b>Maximum</b></td><td><b>Status</b></td></tr>')
00397     html.append('<tr><td><b>Mean Sq. Effort</b></td><td>%.2f</td><td>%.2f</td><td>%s</td></tr>' % (mse, params.flex_mse, ok_dict[mse_ok]))
00398     html.append('<tr><td><b>Average Abs. Effort</b></td><td>%.2f</td><td>%.2f</td><td>%s</td></tr>' % (avg_abs, params.flex_avg_abs, ok_dict[avg_abs_ok]))
00399     html.append('<tr><td><b>Average Effort</b></td><td>%.2f</td><td>%.2f</td><td>%s</td></tr>' % (avg_eff, params.flex_avg_eff, ok_dict[avg_eff_ok]))
00400     html.append('</table>')
00401 
00402     result.html = '\n'.join(html)
00403 
00404     result.result = mse_ok and avg_abs_ok
00405     
00406     result.values = []
00407     result.values.append(TestValue('Flex MSE', str(mse), '', str(params.flex_mse)))
00408     result.values.append(TestValue('Flex Avg. Abs. Effort', str(avg_abs), '', str(params.flex_avg_abs)))
00409     result.values.append(TestValue('Flex Avg.  Effort', str(avg_eff), '', str(params.flex_avg_eff)))
00410 
00411     return result
00412 
00413 ##\brief Calculates CB adjustment 
00414 ##
00415 ##\return (secondary, arm_gimbal) : Turns CW
00416 def calc_cb_adjust(data, model_file):
00417     try:
00418         # Use this to tune to last known "good" position
00419         # A = numpy.load(model_file).transpose()
00420         
00421         # This uses minimum of total torque
00422         A = numpy.load(model_file)[:-1].transpose() 
00423         B = numpy.array(get_efforts(data, True) + get_efforts(data, False))
00424         X = numpy.linalg.lstsq(A,B)
00425     except:
00426         print >> sys.stderr, "Unable to calculate CB adjustment. May have incorrect model data"
00427         import traceback
00428         traceback.print_exc()
00429         return (100, 100)
00430 
00431     # Recommended adjustments
00432     secondary = -X[0][0]
00433     cb_bar = X[0][1] # CCW increases force
00434 
00435     return (secondary, cb_bar)
00436 
00437 ##\brief Return CB adjustments to minimize total torque
00438 ##
00439 ## Uses CB adjustment/torque value from the model file. 
00440 ## Model file is generated by 'test_pr2_self_test/counterbalance_training.py'
00441 ##\param params CounterbalanceAnalysisParams
00442 ##\param data CounterbalanceAnalysisData
00443 ##\param str : Filename of model file
00444 def check_cb_adjustment(params, data, model_file):
00445     result = CounterbalanceAnalysisResult()
00446 
00447     (secondary, cb_bar) = calc_cb_adjust(data, model_file)
00448 
00449     if (abs(secondary) > 25 or abs(cb_bar) > 25):
00450         result.result = False
00451         result.summary = 'Unable to calculate CB adjustment. Data may be corrupt or CB may be too far off'
00452         result.html = '<p>Unable to calculate CB adjustment.</p>' 
00453         return result
00454 
00455     secondary_dir = 'CW' if secondary > 0 else 'CCW' 
00456     cb_bar_dir = 'CW' if cb_bar > 0 else 'CCW'
00457     
00458     adjust_msg = '<table border="1" cellpadding="2" cellspacing="0">'
00459     adjust_msg += '<tr><td><b>Adjustment</b></td><td><b>Turns</b></td><td><b>Direction</b></td><td><b>Allowable Tolerance</b></td></tr>\n'
00460     adjust_msg += '<tr><td>Secondary Spring</td><td>%.1f</td><td>%s</td><td>%.1f</td></tr>\n' % (abs(secondary), secondary_dir, params.screw_tol)
00461     adjust_msg += '<tr><td>Arm Gimbal Shaft</td><td>%.1f</td><td>%s</td><td>%.1f</td></tr>\n' % (abs(cb_bar), cb_bar_dir, params.bar_tol)
00462     adjust_msg += '</table>\n'
00463 
00464     
00465     if abs(secondary) > params.screw_tol or abs(cb_bar) > params.bar_tol:
00466         result.result = False
00467         result.summary = 'CB adjustment recommended. Follow instructions below to tune CB. '
00468         result.html = '<p>CB adjustment recommended. Adjusting the counterbalance will increase performance of the arm. (Note: CW = "Clockwise")</p>\n<p>%s</p>\n' % adjust_msg
00469     else:
00470         result.result = True
00471         result.summary = ''
00472         result.html = '<p>No CB adjustment recommended. You may choose to adjust the CB using the following instructions, but this is within tolerance.</p>\n<p>%s</p>' % adjust_msg
00473 
00474 
00475     result.values = [TestValue('Secondary Spring Adjustment', str(secondary), '', str(params.screw_tol)),
00476                      TestValue('CB Bar Adjustment', str(cb_bar), '', str(params.bar_tol))]
00477 
00478     return result


pr2_counterbalance_check
Author(s): Kevin Watts
autogenerated on Sat Apr 27 2019 03:11:00