diagnostic_tool_widget.py
Go to the documentation of this file.
1 import os
2 import datetime
3 from get_diagnostic_data import *
4 from analysis_test import Diagnostic
5 from yaml import load
6 import matplotlib.pyplot as plt
7 import numpy
8 from scipy.stats import scoreatpercentile
9 
10 from python_qt_binding import loadUi
11 from python_qt_binding.QtCore import qWarning, Qt
12 from python_qt_binding.QtGui import QWidget, QPushButton, QFileDialog, QLabel
13 
14 class DiagnosticToolWidget(QWidget):
15 
16  def __init__(self):
17  super(DiagnosticToolWidget, self).__init__()
18  ui_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'pr2_diagnostic_widget.ui')
19  loadUi(ui_file, self)
20  self.setObjectName('DiagnosticToolWidget')
21  self.left_bool = False
22  self.right_bool = False
23  self.head_bool = False
24  self.debug_info_bool = False
25  self.bad_results_bool = False
26  self.r_arm_actuators = ['r_wrist_r_motor','r_wrist_l_motor','r_forearm_roll_motor','r_upper_arm_roll_motor', 'r_elbow_flex_motor','r_shoulder_lift_motor','r_shoulder_pan_motor']
27  self.l_arm_actuators = ['l_wrist_r_motor','l_wrist_l_motor','l_forearm_roll_motor','l_upper_arm_roll_motor', 'l_elbow_flex_motor','l_shoulder_lift_motor','l_shoulder_pan_motor']
28  self.head_actuators = ['head_pan_motor','head_tilt_motor']
29  self.joint_list = []
30  self.filelist = []
31  self.filenames = []
32  self.jointnames = []
33  self.plots = []
34 
35  #getDataBtn = QPushButton('Quit', self)
36  self.leftArm.stateChanged.connect(self.left_arm_selected)
37  self.rightArm.stateChanged.connect(self.right_arm_selected)
38  self.head.stateChanged.connect(self.head_selected)
39  self.debugInfo.stateChanged.connect(self.debug_info_selected)
40  self.badResults.stateChanged.connect(self.bad_results_selected)
41  self.getData.clicked[bool].connect(self.get_data_pressed)
42  self.resetJointlist.clicked[bool].connect(self.reset_jointlist)
43  self.analyzeData.clicked[bool].connect(self.analyze_data)
44  self.loadFile.clicked[bool].connect(self.load_file)
45  self.loadDirectory.clicked[bool].connect(self.load_directory)
46  self.resetFilelist.clicked[bool].connect(self.reset_filelist)
47  templist = self.r_arm_actuators + self.l_arm_actuators + self.head_actuators
48  for joint in templist:
49  self.jointWidget.addItem(str(joint))
50  self.jointWidget.itemSelectionChanged.connect(self.joint_widget_changed)
51 
52  rospy.Subscriber("joy", Joy, callback)
53 
55  self.jointnames = []
56  for joint in self.jointWidget.selectedItems():
57  if joint.text() not in self.jointnames:
58  self.jointnames.append(joint.text())
59  self.jointnames = [f.encode("ascii") for f in self.jointnames]
60 
61  self.jointLabel.setText('Joints: ' + str(self.jointnames))
62  self.jointLabel.setWordWrap(True)
63 
64  def left_arm_selected(self, state):
65  if state == Qt.Checked:
66  self.left_bool = True
67  self.jointnames.extend(self.l_arm_actuators)
68  else:
69  self.left_bool = False
70  self.jointnames = [joint for joint in self.jointnames if joint not in self.l_arm_actuators]
71 
72  self.jointLabel.setText('Joints: ' + str(self.jointnames))
73  self.jointLabel.setWordWrap(True)
74 
75  def right_arm_selected(self, state):
76  if state == Qt.Checked:
77  self.right_bool = True
78  self.jointnames.extend(self.r_arm_actuators)
79  else:
80  self.right_bool = False
81  self.jointnames = [joint for joint in self.jointnames if joint not in self.r_arm_actuators]
82 
83  self.jointLabel.setText('Joints: ' + str(self.jointnames))
84  self.jointLabel.setWordWrap(True)
85 
86  def head_selected(self, state):
87  if state == Qt.Checked:
88  self.head_bool = True
89  self.jointnames.extend(self.head_actuators)
90  else:
91  self.head_bool = False
92  self.jointnames = [joint for joint in self.jointnames if joint not in self.head_actuators]
93 
94  self.jointLabel.setText('Joints: ' + str(self.jointnames))
95  self.jointLabel.setWordWrap(True)
96 
97  def debug_info_selected(self, state):
98  if state == Qt.Checked:
99  self.debug_info_bool = True
100  else:
101  self.debug_info_bool = False
102 
103  def bad_results_selected(self, state):
104  if state == Qt.Checked:
105  self.bad_results_bool = True
106  else:
107  self.bad_results_bool = False
108 
109  def reset_jointlist(self):
110  self.left_bool = False
111  self.right_bool = False
112  self.head_bool = False
113  self.leftArm.setChecked(False)
114  self.rightArm.setChecked(False)
115  self.head.setChecked(False)
116 
117  self.joint_list = []
118  self.jointnames = []
119 
120  self.jointLabel.setText('Joints: ' + str(self.jointnames))
121  self.jointLabel.setWordWrap(True)
122 
123  def reset_filelist(self):
124  self.filelist = []
125  self.filenames = []
126  self.fileLabel.setText('Files: ')
127  for fig_name in self.plots:
128  plt.close()
129  self.plots = []
130 
131  def get_data_pressed(self):
132  self.joint_list = []
133  self.jointLabel.setText('Joints: ' + str(self.jointnames))
134  self.jointLabel.setWordWrap(True)
135  self.joint_list.extend(self.jointnames)
136  folder = str(datetime.datetime.now())
137  folder = folder.split(" ")
138  folder = "_".join(folder)
139 
140  os.system("mkdir %s" %(folder))
141  os.chdir(os.getcwd() + '/' + folder)
142 
143  switch_controller([],['diagnostic_controller'],SwitchControllerRequest.STRICT)
144  unload_controller('diagnostic_controller')
145 
146  for actuator_name in self.joint_list:
147  print "Press X to start or for next joint"
148  wait_for_X()
149  start_diag_controller(actuator_name)
150  print "start moving %s for about 5 seconds" %(actuator_name)
151  rospy.sleep(5.0)
152  print "press circle when your done"
153  #rospy.loginfo("move %s for about 5 seconds and then press circle when done", joint_name)
155  get_diag_data(actuator_name)
156  print "Finshed getting data"
157 
158  def close_all(self):
159  for fig_name in self.plots:
160  plt.close()
161  self.plots = []
162 
163  def analyze_data(self):
164  diagnostic = Diagnostic()
165  debug_info = self.debug_info_bool
166  bad_results = self.bad_results_bool
167 
168  for filename in self.filelist:
169  actuator_name = os.path.basename(filename)
170  if debug_info:
171  print "\n" + str(actuator_name)
172  stream = file(filename, 'r')
173  samples = load(stream)
174  velocity = []
175  encoder_position = []
176  supply_voltage = []
177  measured_motor_voltage = []
178  executed_current = []
179  measured_current = []
180  timestamp = []
181 
182  for s in samples.sample_buffer:
183  velocity.append(s.velocity)
184  encoder_position.append(s.encoder_position)
185  supply_voltage.append(s.supply_voltage)
186  measured_motor_voltage.append(s.measured_motor_voltage)
187  executed_current.append(s.executed_current)
188  measured_current.append(s.measured_current)
189  timestamp.append(s.timestamp)
190 
191  velocity = numpy.array(velocity)
192  encoder_position = numpy.array(encoder_position)
193 
194  supply_voltage = numpy.array(supply_voltage)
195  measured_motor_voltage = numpy.array(measured_motor_voltage)
196  executed_current = numpy.array(executed_current)
197  measured_current = numpy.array(measured_current)
198 
199  acceleration = diagnostic.get_acceleration(velocity, timestamp)
200 
201  spikes = acceleration * (velocity[:-1])
202 
203  (result1, outlier_limit_neg, outlier_limit_pos) = diagnostic.check_for_spikes(spikes, actuator_name, debug_info)
204  result2 = diagnostic.check_for_unplugged(velocity, measured_motor_voltage, actuator_name, debug_info)
205  result3 = diagnostic.check_for_open(velocity, measured_motor_voltage, actuator_name, debug_info)
206  result = result1 or result2 or result3
207 
208  if bad_results:
209  if result:
210  param = (actuator_name,velocity,spikes,acceleration,outlier_limit_neg,outlier_limit_pos,supply_voltage,measured_motor_voltage,executed_current,measured_current, result1, result2, result3)
211  self.plots.append(actuator_name +'_1')
212  self.plots.append(actuator_name +'_2')
213  diagnostic.plot(param)
214  else:
215  param = (actuator_name,velocity,spikes,acceleration,outlier_limit_neg,outlier_limit_pos,supply_voltage,measured_motor_voltage,executed_current,measured_current, result1, result2, result3)
216  self.plots.append(actuator_name +'_1')
217  self.plots.append(actuator_name +'_2')
218  diagnostic.plot(param)
219 
220  plt.show()
221 
222  def load_file(self):
223  filename = QFileDialog.getOpenFileName(self, self.tr("Open File"), "../", self.tr("Yaml (*.yaml)"))
224  if os.path.basename(filename[0].encode("ascii")) == "":
225  return
226 
227  self.filelist.append(filename[0])
228  self.filenames.append(os.path.basename(filename[0].encode("ascii")))
229  self.fileLabel.setText('Files: ' + str(self.filenames))
230  self.fileLabel.setWordWrap(True)
231 
232  def load_directory(self):
233  directory = QFileDialog.getExistingDirectory(self, self.tr("Open Directory"), "../")
234  if directory == '':
235  return
236 
237  temp = [f.encode("ascii") for f in os.listdir(directory)]
238  self.filenames.extend(temp)
239  temp = [directory + '/' + filepath for filepath in os.listdir(directory)]
240  self.filelist.extend(temp)
241  self.fileLabel.setText('Files: ' + str(self.filenames))
242  self.fileLabel.setWordWrap(True)
def start_diag_controller(actuator_name)
def get_diag_data(actuator_name)


pr2_motor_diagnostic_tool
Author(s): Rahul Udasi
autogenerated on Wed Jan 6 2021 03:39:21