analysis_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import roslib; roslib.load_manifest('pr2_motor_diagnostic_tool')
00003 from pr2_motor_diagnostic_tool.msg import *
00004 import os
00005 from yaml import load
00006 import argparse
00007 import matplotlib.pyplot as plt 
00008 import numpy
00009 from scipy.stats import scoreatpercentile
00010 import sys
00011 
00012 #dictionary of actuators and their torque constants
00013 r_arm_actuators = ['r_wrist_r_motor','r_wrist_l_motor','r_forearm_roll_motor','r_upper_arm_roll_motor', 'r_elbow_flex_motor','r_shoulder_lift_motor','r_shoulder_pan_motor']
00014 l_arm_actuators = ['l_wrist_r_motor','l_wrist_l_motor','l_forearm_roll_motor','l_upper_arm_roll_motor', 'l_elbow_flex_motor','l_shoulder_lift_motor','l_shoulder_pan_motor']
00015 
00016 class Diagnostic():
00017   """Main diagnostic class with methods to get acceleration, check for spikes
00018      check for unplugged, check for open circuit and plot graphs"""
00019   
00020   def show(self):
00021     print "gello"
00022 
00023   def get_acceleration(self, velocity, timestamp):
00024     acceleration = (numpy.array(velocity[1:]) - numpy.array(velocity[:-1])) / (numpy.array(timestamp[1:]) - numpy.array(timestamp[:-1]))
00025     acceleration = abs(acceleration)
00026     if acceleration.max() == 0:
00027        return 0 
00028     acceleration = acceleration / acceleration.max()
00029     return acceleration
00030 
00031   def check_for_spikes(self, spikes, actuator_name, debug_info):
00032     outlier_limit_neg = 0
00033     outlier_limit_pos = 0
00034     neg = [val for val in spikes if val < 0]
00035     pos = [val for val in spikes if val > 0]
00036 
00037     neg = numpy.sort(neg,kind='mergesort')
00038     pos = numpy.sort(pos,kind='mergesort')
00039     
00040     if (len(neg) > 1):
00041       iq_range_neg = scoreatpercentile(neg,-75) - scoreatpercentile(neg,-25)
00042       outlier_limit_neg = iq_range_neg * 3 + scoreatpercentile(neg,-75)
00043 
00044     if (len(pos) > 1):
00045       iq_range_pos = scoreatpercentile(pos,75) - scoreatpercentile(pos,25)
00046       outlier_limit_pos = iq_range_pos * 3 + scoreatpercentile(pos,75)
00047 
00048     if debug_info:  
00049       if (len(neg) > 1):
00050         print "neg mean",neg.mean()
00051         print "neg outlier limit",outlier_limit_neg
00052       
00053       if (len(pos) > 1):
00054         print "pos mean",pos.mean()
00055         print "pos outlier limit",outlier_limit_pos
00056 
00057     outliers_neg = [val for val in neg if val < outlier_limit_neg]
00058     outliers_pos = [val for val in pos if val > outlier_limit_pos]
00059 
00060     if debug_info: 
00061       print "outliers in filtered data", outliers_neg, outliers_pos
00062 
00063     if (len(outliers_neg) + len(outliers_pos)) > 4:
00064       print "Encoder could be spoilt for,", actuator_name
00065       return (True, outlier_limit_neg, outlier_limit_pos)
00066 
00067     return (False, outlier_limit_neg, outlier_limit_pos)
00068 
00069   def check_for_unplugged(self, velocity, measured_motor_voltage, actuator_name, debug_info):
00070     zero_velocity = [val for val in velocity if val == 0]
00071     zero_voltage = [val for val in measured_motor_voltage if val == 0]
00072     zero_velocity = len(zero_velocity) / (len(velocity) + 0.0)
00073     zero_voltage = len(zero_voltage) / (len(measured_motor_voltage) + 0.0)
00074     if debug_info:
00075       print "percentage of zero velocity is", zero_velocity
00076       print "percentage of zero voltage is", zero_voltage
00077 
00078     if zero_velocity > 2 and zero_voltage < 0.5:
00079       print "Encoder could be unplugged for, ", actuator_name
00080       return True
00081     return False
00082 
00083   def check_for_open(self, velocity, measured_motor_voltage, actuator_name, debug_info):
00084     measured_motor_voltage = abs(measured_motor_voltage)
00085     velocity = abs(velocity)
00086     mean_voltage = measured_motor_voltage.mean()
00087     mean_velocity = velocity.mean()
00088     if debug_info:
00089       print "mean_voltage is ",mean_voltage
00090       print "mean_velocity is ",mean_velocity
00091 
00092     if mean_voltage < 0.05:
00093       if mean_velocity > 0.3:
00094         print "Motor wires could be cut causing open circuit, ", actuator_name
00095         return True
00096     return False
00097 
00098   def plot(self, param):
00099     (filename,velocity,spikes,acceleration,outlier_limit_neg,outlier_limit_pos,supply_voltage, measured_motor_voltage,executed_current, measured_current, r1, r2, r3) = param
00100     global plot_enabled
00101     plot_enabled = True
00102      
00103     fig1 = plt.figure(filename + '_1')
00104     if r1:
00105         fig1.suptitle("The encoder might be spoilt", fontsize=14)
00106 
00107     plt.subplot(311)
00108     plt.plot(velocity,label='velocity')
00109     plt.legend()
00110 
00111     temp_neg = outlier_limit_neg
00112     temp_pos = outlier_limit_pos
00113     outlier_limit_neg = [temp_neg for val in range(0,len(velocity))]
00114     outlier_limit_pos = [temp_pos for val in range(0,len(velocity))]
00115 
00116     plt.subplot(312)
00117     plt.plot(spikes,label='acceleration * velocity')
00118     plt.plot(outlier_limit_neg,'r')
00119     plt.plot(outlier_limit_pos,'r')
00120     plt.legend()
00121 
00122     plt.subplot(313)
00123     plt.plot(acceleration,'g', label='acceleration')
00124     plt.legend()
00125 
00126     fig2 = plt.figure(filename + '_2')
00127     if r2:
00128         fig2.suptitle("The encoder could be unplugged", fontsize=14)
00129     if r3:
00130         fig2.suptitle("The motor wires might be cut", fontsize=14)
00131 
00132     plt.plot(supply_voltage,'b',label='supply_voltage')
00133     plt.plot(measured_motor_voltage,'g',label='measured_motor_voltage')
00134     plt.plot(executed_current,'*y',label='executed_current')
00135     plt.plot(measured_current,'m',label='measured_current')
00136     plt.legend() 
00137 
00138     
00139 if __name__ == '__main__':
00140   parser = argparse.ArgumentParser("script to analyse diagnostic data for PR2")
00141   parser.add_argument("files", help="Specify a file name or a folder with files to analyse")
00142   parser.add_argument("-d","--display", help="display graphs for only bad results", action="store_true")
00143   parser.add_argument("-v","--verbose",help="print all debug information",action="store_true")
00144   args = parser.parse_args()
00145   
00146   positional_args = args.files.split(".")
00147 
00148   filelist = [] 
00149   plot_enabled = False
00150   diagnostic = Diagnostic()
00151 
00152   if (positional_args[-1] == 'yaml'):
00153     filelist.append(".".join(positional_args)) 
00154   else:
00155     dirpath = os.getcwd() + '/' + positional_args[0]  
00156     filelist = os.listdir(dirpath)
00157     os.chdir(dirpath)
00158 
00159   debug_info = args.verbose
00160   print filelist
00161   ppr = 1200.0
00162   delay = 1 
00163 
00164   for filename in filelist:
00165     print "\n"
00166     actuator_name = filename[:-13]
00167 
00168     if debug_info:
00169       print actuator_name
00170 
00171     stream = file(filename, 'r')
00172     samples = load(stream)
00173     velocity = []
00174     encoder_position = []
00175     supply_voltage = []
00176     measured_motor_voltage = []
00177     executed_current = []
00178     measured_current = []
00179     timestamp = []
00180 
00181     for s in samples.sample_buffer:
00182       velocity.append(s.velocity)
00183       encoder_position.append(s.encoder_position)
00184       supply_voltage.append(s.supply_voltage)
00185       measured_motor_voltage.append(s.measured_motor_voltage)
00186       executed_current.append(s.executed_current)
00187       measured_current.append(s.measured_current)
00188       timestamp.append(s.timestamp)
00189 
00190     velocity = numpy.array(velocity)
00191     encoder_position = numpy.array(encoder_position)
00192    
00193     supply_voltage = numpy.array(supply_voltage)
00194     measured_motor_voltage = numpy.array(measured_motor_voltage)
00195     executed_current = numpy.array(executed_current)
00196     measured_current = numpy.array(measured_current)
00197 
00198     acceleration = diagnostic.get_acceleration(velocity, timestamp)
00199 
00200     spikes = acceleration * (velocity[:-1])
00201 
00202     (result1, outlier_limit_neg, outlier_limit_pos) = diagnostic.check_for_spikes(spikes, actuator_name, debug_info) 
00203     result2 = diagnostic.check_for_unplugged(velocity, measured_motor_voltage, actuator_name, debug_info)
00204     result3 = diagnostic.check_for_open(velocity, measured_motor_voltage, actuator_name, debug_info)
00205     result = result1 or  result2 or result3
00206 
00207     if args.display:
00208       if result:
00209         param = (filename,velocity,spikes,acceleration,outlier_limit_neg,outlier_limit_pos, result1, result2, result3)
00210         diagnostic.plot(param)
00211     else:
00212       param = (filename,velocity,spikes,acceleration,outlier_limit_neg,outlier_limit_pos, result1, result2, result3)
00213       diagnostic.plot(param)
00214   
00215   if plot_enabled:  
00216     plt.show()


pr2_motor_diagnostic_tool
Author(s): Rahul Udasi
autogenerated on Sat Apr 27 2019 03:10:57