2 import roslib; roslib.load_manifest(
'pr2_motor_diagnostic_tool')
7 import matplotlib.pyplot
as plt
9 from scipy.stats
import scoreatpercentile
13 r_arm_actuators = [
'r_wrist_r_motor',
'r_wrist_l_motor',
'r_forearm_roll_motor',
'r_upper_arm_roll_motor',
'r_elbow_flex_motor',
'r_shoulder_lift_motor',
'r_shoulder_pan_motor']
14 l_arm_actuators = [
'l_wrist_r_motor',
'l_wrist_l_motor',
'l_forearm_roll_motor',
'l_upper_arm_roll_motor',
'l_elbow_flex_motor',
'l_shoulder_lift_motor',
'l_shoulder_pan_motor']
17 """Main diagnostic class with methods to get acceleration, check for spikes 18 check for unplugged, check for open circuit and plot graphs""" 24 acceleration = (numpy.array(velocity[1:]) - numpy.array(velocity[:-1])) / (numpy.array(timestamp[1:]) - numpy.array(timestamp[:-1]))
25 acceleration = abs(acceleration)
26 if acceleration.max() == 0:
28 acceleration = acceleration / acceleration.max()
34 neg = [val
for val
in spikes
if val < 0]
35 pos = [val
for val
in spikes
if val > 0]
37 neg = numpy.sort(neg,kind=
'mergesort')
38 pos = numpy.sort(pos,kind=
'mergesort')
41 iq_range_neg = scoreatpercentile(neg,-75) - scoreatpercentile(neg,-25)
42 outlier_limit_neg = iq_range_neg * 3 + scoreatpercentile(neg,-75)
45 iq_range_pos = scoreatpercentile(pos,75) - scoreatpercentile(pos,25)
46 outlier_limit_pos = iq_range_pos * 3 + scoreatpercentile(pos,75)
50 print "neg mean",neg.mean()
51 print "neg outlier limit",outlier_limit_neg
54 print "pos mean",pos.mean()
55 print "pos outlier limit",outlier_limit_pos
57 outliers_neg = [val
for val
in neg
if val < outlier_limit_neg]
58 outliers_pos = [val
for val
in pos
if val > outlier_limit_pos]
61 print "outliers in filtered data", outliers_neg, outliers_pos
63 if (len(outliers_neg) + len(outliers_pos)) > 4:
64 print "Encoder could be spoilt for,", actuator_name
65 return (
True, outlier_limit_neg, outlier_limit_pos)
67 return (
False, outlier_limit_neg, outlier_limit_pos)
70 zero_velocity = [val
for val
in velocity
if val == 0]
71 zero_voltage = [val
for val
in measured_motor_voltage
if val == 0]
72 zero_velocity = len(zero_velocity) / (len(velocity) + 0.0)
73 zero_voltage = len(zero_voltage) / (len(measured_motor_voltage) + 0.0)
75 print "percentage of zero velocity is", zero_velocity
76 print "percentage of zero voltage is", zero_voltage
78 if zero_velocity > 2
and zero_voltage < 0.5:
79 print "Encoder could be unplugged for, ", actuator_name
83 def check_for_open(self, velocity, measured_motor_voltage, actuator_name, debug_info):
84 measured_motor_voltage = abs(measured_motor_voltage)
85 velocity = abs(velocity)
86 mean_voltage = measured_motor_voltage.mean()
87 mean_velocity = velocity.mean()
89 print "mean_voltage is ",mean_voltage
90 print "mean_velocity is ",mean_velocity
92 if mean_voltage < 0.05:
93 if mean_velocity > 0.3:
94 print "Motor wires could be cut causing open circuit, ", actuator_name
99 (filename,velocity,spikes,acceleration,outlier_limit_neg,outlier_limit_pos,supply_voltage, measured_motor_voltage,executed_current, measured_current, r1, r2, r3) = param
103 fig1 = plt.figure(filename +
'_1')
105 fig1.suptitle(
"The encoder might be spoilt", fontsize=14)
108 plt.plot(velocity,label=
'velocity')
111 temp_neg = outlier_limit_neg
112 temp_pos = outlier_limit_pos
113 outlier_limit_neg = [temp_neg
for val
in range(0,len(velocity))]
114 outlier_limit_pos = [temp_pos
for val
in range(0,len(velocity))]
117 plt.plot(spikes,label=
'acceleration * velocity')
118 plt.plot(outlier_limit_neg,
'r') 119 plt.plot(outlier_limit_pos,'r') 123 plt.plot(acceleration,'g', label=
'acceleration')
126 fig2 = plt.figure(filename +
'_2')
128 fig2.suptitle(
"The encoder could be unplugged", fontsize=14)
130 fig2.suptitle(
"The motor wires might be cut", fontsize=14)
132 plt.plot(supply_voltage,
'b',label=
'supply_voltage')
133 plt.plot(measured_motor_voltage,
'g',label=
'measured_motor_voltage')
134 plt.plot(executed_current,
'*y',label=
'executed_current')
135 plt.plot(measured_current,
'm',label=
'measured_current')
139 if __name__ ==
'__main__':
140 parser = argparse.ArgumentParser(
"script to analyse diagnostic data for PR2")
141 parser.add_argument(
"files", help=
"Specify a file name or a folder with files to analyse")
142 parser.add_argument(
"-d",
"--display", help=
"display graphs for only bad results", action=
"store_true")
143 parser.add_argument(
"-v",
"--verbose",help=
"print all debug information",action=
"store_true")
144 args = parser.parse_args()
146 positional_args = args.files.split(
".")
152 if (positional_args[-1] ==
'yaml'):
153 filelist.append(
".".join(positional_args))
155 dirpath = os.getcwd() +
'/' + positional_args[0]
156 filelist = os.listdir(dirpath)
159 debug_info = args.verbose
164 for filename
in filelist:
166 actuator_name = filename[:-13]
171 stream = file(filename,
'r') 172 samples = load(stream) 174 encoder_position = [] 176 measured_motor_voltage = [] 177 executed_current = [] 178 measured_current = [] 181 for s
in samples.sample_buffer:
182 velocity.append(s.velocity)
183 encoder_position.append(s.encoder_position)
184 supply_voltage.append(s.supply_voltage)
185 measured_motor_voltage.append(s.measured_motor_voltage)
186 executed_current.append(s.executed_current)
187 measured_current.append(s.measured_current)
188 timestamp.append(s.timestamp)
190 velocity = numpy.array(velocity)
191 encoder_position = numpy.array(encoder_position)
193 supply_voltage = numpy.array(supply_voltage)
194 measured_motor_voltage = numpy.array(measured_motor_voltage)
195 executed_current = numpy.array(executed_current)
196 measured_current = numpy.array(measured_current)
198 acceleration = diagnostic.get_acceleration(velocity, timestamp)
200 spikes = acceleration * (velocity[:-1])
202 (result1, outlier_limit_neg, outlier_limit_pos) = diagnostic.check_for_spikes(spikes, actuator_name, debug_info)
203 result2 = diagnostic.check_for_unplugged(velocity, measured_motor_voltage, actuator_name, debug_info)
204 result3 = diagnostic.check_for_open(velocity, measured_motor_voltage, actuator_name, debug_info)
205 result = result1
or result2
or result3
209 param = (filename,velocity,spikes,acceleration,outlier_limit_neg,outlier_limit_pos, result1, result2, result3)
210 diagnostic.plot(param)
212 param = (filename,velocity,spikes,acceleration,outlier_limit_neg,outlier_limit_pos, result1, result2, result3)
213 diagnostic.plot(param)
def check_for_spikes(self, spikes, actuator_name, debug_info)
def check_for_open(self, velocity, measured_motor_voltage, actuator_name, debug_info)
def get_acceleration(self, velocity, timestamp)
def check_for_unplugged(self, velocity, measured_motor_voltage, actuator_name, debug_info)