diagnostic_tool_widget.py
Go to the documentation of this file.
00001 import os
00002 import datetime
00003 from get_diagnostic_data import *
00004 from analysis_test import Diagnostic
00005 from yaml import load
00006 import matplotlib.pyplot as plt
00007 import numpy
00008 from scipy.stats import scoreatpercentile
00009 
00010 from python_qt_binding import loadUi
00011 from python_qt_binding.QtCore import qWarning, Qt
00012 from python_qt_binding.QtGui import QWidget, QPushButton, QFileDialog, QLabel
00013 
00014 class DiagnosticToolWidget(QWidget):
00015   
00016     def __init__(self):
00017         super(DiagnosticToolWidget, self).__init__()
00018         ui_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'pr2_diagnostic_widget.ui')  
00019         loadUi(ui_file, self)
00020         self.setObjectName('DiagnosticToolWidget')
00021         self.left_bool = False
00022         self.right_bool = False
00023         self.head_bool = False
00024         self.debug_info_bool = False
00025         self.bad_results_bool = False
00026         self.r_arm_actuators = ['r_wrist_r_motor','r_wrist_l_motor','r_forearm_roll_motor','r_upper_arm_roll_motor', 'r_elbow_flex_motor','r_shoulder_lift_motor','r_shoulder_pan_motor'] 
00027         self.l_arm_actuators = ['l_wrist_r_motor','l_wrist_l_motor','l_forearm_roll_motor','l_upper_arm_roll_motor', 'l_elbow_flex_motor','l_shoulder_lift_motor','l_shoulder_pan_motor'] 
00028         self.head_actuators = ['head_pan_motor','head_tilt_motor']
00029         self.joint_list = []
00030         self.filelist = []
00031         self.filenames = []
00032         self.jointnames = []
00033         self.plots = []
00034 
00035         #getDataBtn = QPushButton('Quit', self) 
00036         self.leftArm.stateChanged.connect(self.left_arm_selected)
00037         self.rightArm.stateChanged.connect(self.right_arm_selected)
00038         self.head.stateChanged.connect(self.head_selected)
00039         self.debugInfo.stateChanged.connect(self.debug_info_selected)
00040         self.badResults.stateChanged.connect(self.bad_results_selected)
00041         self.getData.clicked[bool].connect(self.get_data_pressed)
00042         self.resetJointlist.clicked[bool].connect(self.reset_jointlist)
00043         self.analyzeData.clicked[bool].connect(self.analyze_data)
00044         self.loadFile.clicked[bool].connect(self.load_file)
00045         self.loadDirectory.clicked[bool].connect(self.load_directory)
00046         self.resetFilelist.clicked[bool].connect(self.reset_filelist)
00047         templist = self.r_arm_actuators + self.l_arm_actuators + self.head_actuators
00048         for joint in templist:
00049             self.jointWidget.addItem(str(joint))        
00050         self.jointWidget.itemSelectionChanged.connect(self.joint_widget_changed) 
00051         
00052         rospy.Subscriber("joy", Joy, callback)
00053 
00054     def joint_widget_changed(self):
00055         self.jointnames = []
00056         for joint in self.jointWidget.selectedItems():
00057            if joint.text() not in self.jointnames:
00058                self.jointnames.append(joint.text())
00059         self.jointnames = [f.encode("ascii") for f in self.jointnames]
00060 
00061         self.jointLabel.setText('Joints: ' + str(self.jointnames))
00062         self.jointLabel.setWordWrap(True)
00063 
00064     def left_arm_selected(self, state):
00065         if state == Qt.Checked:
00066             self.left_bool = True
00067             self.jointnames.extend(self.l_arm_actuators)    
00068         else:
00069             self.left_bool = False 
00070             self.jointnames = [joint for joint in self.jointnames if joint not in self.l_arm_actuators]
00071             
00072         self.jointLabel.setText('Joints: ' + str(self.jointnames))
00073         self.jointLabel.setWordWrap(True)
00074 
00075     def right_arm_selected(self, state):
00076         if state == Qt.Checked:
00077             self.right_bool = True
00078             self.jointnames.extend(self.r_arm_actuators)    
00079         else:
00080             self.right_bool = False 
00081             self.jointnames = [joint for joint in self.jointnames if joint not in self.r_arm_actuators]
00082             
00083         self.jointLabel.setText('Joints: ' + str(self.jointnames))
00084         self.jointLabel.setWordWrap(True)
00085          
00086     def head_selected(self, state):
00087         if state == Qt.Checked:
00088             self.head_bool = True 
00089             self.jointnames.extend(self.head_actuators)    
00090         else:
00091             self.head_bool = False 
00092             self.jointnames = [joint for joint in self.jointnames if joint not in self.head_actuators]
00093             
00094         self.jointLabel.setText('Joints: ' + str(self.jointnames))
00095         self.jointLabel.setWordWrap(True)
00096     
00097     def debug_info_selected(self, state):
00098         if state == Qt.Checked:
00099             self.debug_info_bool = True
00100         else:
00101             self.debug_info_bool = False
00102 
00103     def bad_results_selected(self, state):
00104         if state == Qt.Checked:
00105             self.bad_results_bool = True
00106         else:
00107             self.bad_results_bool = False       
00108 
00109     def reset_jointlist(self):
00110         self.left_bool = False
00111         self.right_bool = False
00112         self.head_bool = False
00113         self.leftArm.setChecked(False)
00114         self.rightArm.setChecked(False)
00115         self.head.setChecked(False)
00116 
00117         self.joint_list = []
00118         self.jointnames = []
00119 
00120         self.jointLabel.setText('Joints: ' + str(self.jointnames))
00121         self.jointLabel.setWordWrap(True)
00122 
00123     def reset_filelist(self):
00124         self.filelist = []
00125         self.filenames = []
00126         self.fileLabel.setText('Files: ')
00127         for fig_name in self.plots: 
00128             plt.close()
00129         self.plots = []
00130 
00131     def get_data_pressed(self):
00132         self.joint_list = []
00133         self.jointLabel.setText('Joints: ' + str(self.jointnames))
00134         self.jointLabel.setWordWrap(True)
00135         self.joint_list.extend(self.jointnames)
00136         folder = str(datetime.datetime.now())
00137         folder = folder.split(" ")
00138         folder = "_".join(folder)
00139 
00140         os.system("mkdir %s" %(folder))
00141         os.chdir(os.getcwd() + '/' + folder)
00142 
00143         switch_controller([],['diagnostic_controller'],SwitchControllerRequest.STRICT)
00144         unload_controller('diagnostic_controller')
00145 
00146         for actuator_name in self.joint_list:
00147             print "Press X to start or for next joint"
00148             wait_for_X()
00149             start_diag_controller(actuator_name)
00150             print "start moving %s for about 5 seconds" %(actuator_name)
00151             rospy.sleep(5.0)
00152             print "press circle when your done"
00153             #rospy.loginfo("move %s for about 5 seconds and then press circle when done", joint_name)
00154             wait_for_circle()
00155             get_diag_data(actuator_name)
00156         print "Finshed getting data"
00157 
00158     def close_all(self):
00159         for fig_name in self.plots: 
00160             plt.close()
00161         self.plots = []
00162 
00163     def analyze_data(self):
00164         diagnostic = Diagnostic()
00165         debug_info = self.debug_info_bool
00166         bad_results = self.bad_results_bool
00167 
00168         for filename in self.filelist:
00169             actuator_name  = os.path.basename(filename) 
00170             if debug_info:
00171                 print "\n" + str(actuator_name)
00172             stream = file(filename, 'r')
00173             samples = load(stream)
00174             velocity = []
00175             encoder_position = []
00176             supply_voltage = []
00177             measured_motor_voltage = []
00178             executed_current = []
00179             measured_current = []
00180             timestamp = []
00181 
00182             for s in samples.sample_buffer:
00183                 velocity.append(s.velocity)
00184                 encoder_position.append(s.encoder_position)
00185                 supply_voltage.append(s.supply_voltage)
00186                 measured_motor_voltage.append(s.measured_motor_voltage)
00187                 executed_current.append(s.executed_current)
00188                 measured_current.append(s.measured_current)
00189                 timestamp.append(s.timestamp)
00190 
00191             velocity = numpy.array(velocity)
00192             encoder_position = numpy.array(encoder_position)
00193            
00194             supply_voltage = numpy.array(supply_voltage)
00195             measured_motor_voltage = numpy.array(measured_motor_voltage)
00196             executed_current = numpy.array(executed_current)
00197             measured_current = numpy.array(measured_current)
00198 
00199             acceleration = diagnostic.get_acceleration(velocity, timestamp)
00200 
00201             spikes = acceleration * (velocity[:-1])
00202 
00203             (result1, outlier_limit_neg, outlier_limit_pos) = diagnostic.check_for_spikes(spikes, actuator_name, debug_info)
00204             result2 = diagnostic.check_for_unplugged(velocity, measured_motor_voltage, actuator_name, debug_info)
00205             result3 = diagnostic.check_for_open(velocity, measured_motor_voltage, actuator_name, debug_info)
00206             result = result1 or result2 or result3
00207 
00208             if bad_results:
00209                 if result: 
00210                     param = (actuator_name,velocity,spikes,acceleration,outlier_limit_neg,outlier_limit_pos,supply_voltage,measured_motor_voltage,executed_current,measured_current, result1, result2, result3)
00211                     self.plots.append(actuator_name +'_1')
00212                     self.plots.append(actuator_name +'_2')
00213                     diagnostic.plot(param)
00214             else:
00215                 param = (actuator_name,velocity,spikes,acceleration,outlier_limit_neg,outlier_limit_pos,supply_voltage,measured_motor_voltage,executed_current,measured_current, result1, result2, result3)
00216                 self.plots.append(actuator_name +'_1')
00217                 self.plots.append(actuator_name +'_2')
00218                 diagnostic.plot(param)
00219 
00220         plt.show()    
00221 
00222     def load_file(self):
00223         filename = QFileDialog.getOpenFileName(self, self.tr("Open File"), "../", self.tr("Yaml (*.yaml)"))
00224         if os.path.basename(filename[0].encode("ascii")) == "":
00225             return
00226 
00227         self.filelist.append(filename[0])
00228         self.filenames.append(os.path.basename(filename[0].encode("ascii")))
00229         self.fileLabel.setText('Files: ' + str(self.filenames))
00230         self.fileLabel.setWordWrap(True)        
00231  
00232     def load_directory(self):
00233         directory = QFileDialog.getExistingDirectory(self, self.tr("Open Directory"), "../")
00234         if directory == '': 
00235             return 
00236 
00237         temp = [f.encode("ascii") for f in os.listdir(directory)]
00238         self.filenames.extend(temp)
00239         temp = [directory + '/' + filepath for filepath in os.listdir(directory)]
00240         self.filelist.extend(temp)
00241         self.fileLabel.setText('Files: ' + str(self.filenames))
00242         self.fileLabel.setWordWrap(True)        


pr2_motor_diagnostic_tool
Author(s): Rahul Udasi
autogenerated on Mon Sep 14 2015 14:39:22