analysis_test.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import roslib; roslib.load_manifest('pr2_motor_diagnostic_tool')
4 import os
5 from yaml import load
6 import argparse
7 import matplotlib.pyplot as plt
8 import numpy
9 from scipy.stats import scoreatpercentile
10 import sys
11 
12 #dictionary of actuators and their torque constants
13 r_arm_actuators = ['r_wrist_r_motor','r_wrist_l_motor','r_forearm_roll_motor','r_upper_arm_roll_motor', 'r_elbow_flex_motor','r_shoulder_lift_motor','r_shoulder_pan_motor']
14 l_arm_actuators = ['l_wrist_r_motor','l_wrist_l_motor','l_forearm_roll_motor','l_upper_arm_roll_motor', 'l_elbow_flex_motor','l_shoulder_lift_motor','l_shoulder_pan_motor']
15 
16 class Diagnostic():
17  """Main diagnostic class with methods to get acceleration, check for spikes
18  check for unplugged, check for open circuit and plot graphs"""
19 
20  def show(self):
21  print "gello"
22 
23  def get_acceleration(self, velocity, timestamp):
24  acceleration = (numpy.array(velocity[1:]) - numpy.array(velocity[:-1])) / (numpy.array(timestamp[1:]) - numpy.array(timestamp[:-1]))
25  acceleration = abs(acceleration)
26  if acceleration.max() == 0:
27  return 0
28  acceleration = acceleration / acceleration.max()
29  return acceleration
30 
31  def check_for_spikes(self, spikes, actuator_name, debug_info):
32  outlier_limit_neg = 0
33  outlier_limit_pos = 0
34  neg = [val for val in spikes if val < 0]
35  pos = [val for val in spikes if val > 0]
36 
37  neg = numpy.sort(neg,kind='mergesort')
38  pos = numpy.sort(pos,kind='mergesort')
39 
40  if (len(neg) > 1):
41  iq_range_neg = scoreatpercentile(neg,-75) - scoreatpercentile(neg,-25)
42  outlier_limit_neg = iq_range_neg * 3 + scoreatpercentile(neg,-75)
43 
44  if (len(pos) > 1):
45  iq_range_pos = scoreatpercentile(pos,75) - scoreatpercentile(pos,25)
46  outlier_limit_pos = iq_range_pos * 3 + scoreatpercentile(pos,75)
47 
48  if debug_info:
49  if (len(neg) > 1):
50  print "neg mean",neg.mean()
51  print "neg outlier limit",outlier_limit_neg
52 
53  if (len(pos) > 1):
54  print "pos mean",pos.mean()
55  print "pos outlier limit",outlier_limit_pos
56 
57  outliers_neg = [val for val in neg if val < outlier_limit_neg]
58  outliers_pos = [val for val in pos if val > outlier_limit_pos]
59 
60  if debug_info:
61  print "outliers in filtered data", outliers_neg, outliers_pos
62 
63  if (len(outliers_neg) + len(outliers_pos)) > 4:
64  print "Encoder could be spoilt for,", actuator_name
65  return (True, outlier_limit_neg, outlier_limit_pos)
66 
67  return (False, outlier_limit_neg, outlier_limit_pos)
68 
69  def check_for_unplugged(self, velocity, measured_motor_voltage, actuator_name, debug_info):
70  zero_velocity = [val for val in velocity if val == 0]
71  zero_voltage = [val for val in measured_motor_voltage if val == 0]
72  zero_velocity = len(zero_velocity) / (len(velocity) + 0.0)
73  zero_voltage = len(zero_voltage) / (len(measured_motor_voltage) + 0.0)
74  if debug_info:
75  print "percentage of zero velocity is", zero_velocity
76  print "percentage of zero voltage is", zero_voltage
77 
78  if zero_velocity > 2 and zero_voltage < 0.5:
79  print "Encoder could be unplugged for, ", actuator_name
80  return True
81  return False
82 
83  def check_for_open(self, velocity, measured_motor_voltage, actuator_name, debug_info):
84  measured_motor_voltage = abs(measured_motor_voltage)
85  velocity = abs(velocity)
86  mean_voltage = measured_motor_voltage.mean()
87  mean_velocity = velocity.mean()
88  if debug_info:
89  print "mean_voltage is ",mean_voltage
90  print "mean_velocity is ",mean_velocity
91 
92  if mean_voltage < 0.05:
93  if mean_velocity > 0.3:
94  print "Motor wires could be cut causing open circuit, ", actuator_name
95  return True
96  return False
97 
98  def plot(self, param):
99  (filename,velocity,spikes,acceleration,outlier_limit_neg,outlier_limit_pos,supply_voltage, measured_motor_voltage,executed_current, measured_current, r1, r2, r3) = param
100  global plot_enabled
101  plot_enabled = True
102 
103  fig1 = plt.figure(filename + '_1')
104  if r1:
105  fig1.suptitle("The encoder might be spoilt", fontsize=14)
106 
107  plt.subplot(311)
108  plt.plot(velocity,label='velocity')
109  plt.legend()
110 
111  temp_neg = outlier_limit_neg
112  temp_pos = outlier_limit_pos
113  outlier_limit_neg = [temp_neg for val in range(0,len(velocity))]
114  outlier_limit_pos = [temp_pos for val in range(0,len(velocity))]
115 
116  plt.subplot(312)
117  plt.plot(spikes,label='acceleration * velocity')
118  plt.plot(outlier_limit_neg,'r')
119  plt.plot(outlier_limit_pos,'r')
120  plt.legend()
121 
122  plt.subplot(313)
123  plt.plot(acceleration,'g', label='acceleration')
124  plt.legend()
125 
126  fig2 = plt.figure(filename + '_2')
127  if r2:
128  fig2.suptitle("The encoder could be unplugged", fontsize=14)
129  if r3:
130  fig2.suptitle("The motor wires might be cut", fontsize=14)
131 
132  plt.plot(supply_voltage,'b',label='supply_voltage')
133  plt.plot(measured_motor_voltage,'g',label='measured_motor_voltage')
134  plt.plot(executed_current,'*y',label='executed_current')
135  plt.plot(measured_current,'m',label='measured_current')
136  plt.legend()
137 
138 
139 if __name__ == '__main__':
140  parser = argparse.ArgumentParser("script to analyse diagnostic data for PR2")
141  parser.add_argument("files", help="Specify a file name or a folder with files to analyse")
142  parser.add_argument("-d","--display", help="display graphs for only bad results", action="store_true")
143  parser.add_argument("-v","--verbose",help="print all debug information",action="store_true")
144  args = parser.parse_args()
145 
146  positional_args = args.files.split(".")
147 
148  filelist = []
149  plot_enabled = False
150  diagnostic = Diagnostic()
151 
152  if (positional_args[-1] == 'yaml'):
153  filelist.append(".".join(positional_args))
154  else:
155  dirpath = os.getcwd() + '/' + positional_args[0]
156  filelist = os.listdir(dirpath)
157  os.chdir(dirpath)
158 
159  debug_info = args.verbose
160  print filelist
161  ppr = 1200.0
162  delay = 1
163 
164  for filename in filelist:
165  print "\n"
166  actuator_name = filename[:-13]
167 
168  if debug_info:
169  print actuator_name
170 
171  stream = file(filename, 'r')
172  samples = load(stream)
173  velocity = []
174  encoder_position = []
175  supply_voltage = []
176  measured_motor_voltage = []
177  executed_current = []
178  measured_current = []
179  timestamp = []
180 
181  for s in samples.sample_buffer:
182  velocity.append(s.velocity)
183  encoder_position.append(s.encoder_position)
184  supply_voltage.append(s.supply_voltage)
185  measured_motor_voltage.append(s.measured_motor_voltage)
186  executed_current.append(s.executed_current)
187  measured_current.append(s.measured_current)
188  timestamp.append(s.timestamp)
189 
190  velocity = numpy.array(velocity)
191  encoder_position = numpy.array(encoder_position)
192 
193  supply_voltage = numpy.array(supply_voltage)
194  measured_motor_voltage = numpy.array(measured_motor_voltage)
195  executed_current = numpy.array(executed_current)
196  measured_current = numpy.array(measured_current)
197 
198  acceleration = diagnostic.get_acceleration(velocity, timestamp)
199 
200  spikes = acceleration * (velocity[:-1])
201 
202  (result1, outlier_limit_neg, outlier_limit_pos) = diagnostic.check_for_spikes(spikes, actuator_name, debug_info)
203  result2 = diagnostic.check_for_unplugged(velocity, measured_motor_voltage, actuator_name, debug_info)
204  result3 = diagnostic.check_for_open(velocity, measured_motor_voltage, actuator_name, debug_info)
205  result = result1 or result2 or result3
206 
207  if args.display:
208  if result:
209  param = (filename,velocity,spikes,acceleration,outlier_limit_neg,outlier_limit_pos, result1, result2, result3)
210  diagnostic.plot(param)
211  else:
212  param = (filename,velocity,spikes,acceleration,outlier_limit_neg,outlier_limit_pos, result1, result2, result3)
213  diagnostic.plot(param)
214 
215  if plot_enabled:
216  plt.show()
def check_for_spikes(self, spikes, actuator_name, debug_info)
def check_for_open(self, velocity, measured_motor_voltage, actuator_name, debug_info)
def get_acceleration(self, velocity, timestamp)
def check_for_unplugged(self, velocity, measured_motor_voltage, actuator_name, debug_info)


pr2_motor_diagnostic_tool
Author(s): Rahul Udasi
autogenerated on Wed Jan 6 2021 03:39:21