00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 import os, tarfile, shutil, rospy, rosgraph, rospkg, time
00033
00034 from qt_gui.plugin import Plugin
00035 from python_qt_binding import loadUi
00036
00037 from sr_robot_msgs.srv import ManualSelfTest, ManualSelfTestResponse
00038 from diagnostic_msgs.srv import SelfTest
00039
00040 from QtGui import QWidget, QTreeWidgetItem, QColor, QPixmap, QMessageBox, QInputDialog, QDialog, QSplitter, QLabel, QSizePolicy, QResizeEvent
00041 from QtCore import QThread, SIGNAL, QPoint
00042 from QtCore import Qt
00043
00044 from time import sleep
00045
00046 green = QColor(153, 231, 96)
00047 orange = QColor(247, 206, 134)
00048 red = QColor(236, 178, 178)
00049
00050 class AsyncService(QThread):
00051 def __init__(self, widget, node_name, index):
00052 """
00053 Calling the self test services asynchronously
00054 so that it doesn't "kill" the GUI while they run.
00055 (also runs all test services in parallel -> faster).
00056
00057 @widget: parent widget
00058 @node_name: name of the node for which we're running the self_test
00059 @index: index of this thread in the list of threads (to find out which thread finished in callback)
00060 """
00061 QThread.__init__(self, widget)
00062 self._widget = widget
00063 self.node_name = node_name
00064 self.service_name = node_name+"/self_test"
00065 self.index = index
00066
00067 self.manual_test_req_ = None
00068 self.manual_test_res_ = None
00069 self.resp = None
00070
00071 self.waiting_for_manual_test_ = False
00072
00073 self.srv_manual_test_ = rospy.Service(node_name+"/manual_self_tests", ManualSelfTest, self.manual_test_srv_cb_)
00074
00075 def manual_test_srv_cb_(self, req):
00076 while self.waiting_for_manual_test_:
00077 sleep(0.1)
00078 self.waiting_for_manual_test_ = True
00079 self.manual_test_res_ = None
00080
00081 self.manual_test_req_ = req
00082
00083 self.emit(SIGNAL("manual_test(QPoint)"), QPoint( self.index, 0))
00084
00085 while self.manual_test_res_ == None:
00086 time.sleep(0.01)
00087
00088 self.waiting_for_manual_test_ = False
00089 return self.manual_test_res_
00090
00091 def run(self):
00092 """
00093 Calls the node/self_test service and emits a signal once it's finished running.
00094 """
00095 self_test_srv = rospy.ServiceProxy(self.service_name, SelfTest)
00096 try:
00097 self.resp = self_test_srv()
00098 except rospy.ServiceException, e:
00099 rospy.logerr("Failed to called " + self.service_name+" %s"%str(e))
00100 if self.resp == None:
00101 rospy.logerr("Failed to called " + self.service_name+" %s"%str(e))
00102 return
00103
00104 self.emit(SIGNAL("test_finished(QPoint)"), QPoint( self.index, 0))
00105
00106 def save(self):
00107 """
00108 Save the test results in a file at /tmp/self_tests/node/results.txt
00109 """
00110 if self.resp != None:
00111 path = "/tmp/self_tests/"+self.node_name
00112 if not os.path.exists(path):
00113 os.makedirs(path)
00114 f = open(path+"/results.txt", "w")
00115 f.write( str(self.resp) )
00116 f.close()
00117 else:
00118 rospy.logerr("Test for "+self.node_name+" can't be saved: no results found.")
00119
00120 def shutdown(self):
00121 self.manual_test_res_ = None
00122 self.manual_test_req_ = None
00123 self.srv_manual_test_.shutdown()
00124 self.srv_manual_test_ = None
00125
00126 class ResizeableQPlot(QLabel):
00127 def __init__(self, parent):
00128 QLabel.__init__(self, parent)
00129 self.plot_pixmap = None
00130
00131 def resizeEvent(self, event):
00132 if self.plot_pixmap != None:
00133 pixmap = self.plot_pixmap
00134 size = event.size()
00135 self.setPixmap(pixmap.scaled(size, Qt.KeepAspectRatio, Qt.SmoothTransformation))
00136
00137
00138 class SrGuiSelfTest(Plugin):
00139 """
00140 A rosgui plugin for running self diagnostics of a Shadow robot hand
00141 """
00142 def __init__(self, context):
00143 """
00144 Detects which nodes are advertising a self_test service, makes it possible to run them,
00145 and display the results.
00146 """
00147 super(SrGuiSelfTest, self).__init__(context)
00148 self.setObjectName('SrGuiSelfTest')
00149
00150 self._publisher = None
00151 self._widget = QWidget()
00152
00153
00154 ui_path = os.path.join(rospkg.RosPack().get_path('sr_gui_self_test'), 'uis')
00155 ui_file = os.path.join(ui_path, 'SrSelfTest.ui')
00156 loadUi(ui_file, self._widget)
00157 self._widget.setObjectName('SrSelfTestUi')
00158 context.add_widget(self._widget)
00159
00160 self.test_widget_ = QWidget()
00161 ui_file = os.path.join(ui_path, 'self_test.ui')
00162 loadUi(ui_file, self.test_widget_)
00163 self.plot_widget_ = QWidget()
00164 ui_file = os.path.join(ui_path, 'test_plot.ui')
00165 loadUi(ui_file, self.plot_widget_)
00166
00167
00168 self.splitter_ = QSplitter(Qt.Vertical, self._widget)
00169 self._widget.test_layout.addWidget(self.splitter_)
00170 self.splitter_.addWidget( self.test_widget_ )
00171 self.splitter_.addWidget( self.plot_widget_ )
00172 self.test_widget_.show()
00173 self.plot_widget_.show()
00174
00175 self.nodes = None
00176 self.selected_node_ = None
00177 self.test_threads = []
00178
00179 self.index_picture = 0
00180 self.list_of_pics = []
00181 self.list_of_pics_tests = []
00182
00183
00184 self.resizeable_plot = ResizeableQPlot(self.plot_widget_)
00185 self.resizeable_plot.setSizePolicy(QSizePolicy.Ignored, QSizePolicy.Ignored)
00186 self.resizeable_plot.setAlignment(Qt.AlignCenter)
00187 self.plot_widget_.plot_layout.addWidget(self.resizeable_plot)
00188
00189 self._widget.btn_test.setEnabled(False)
00190 self._widget.btn_save.setEnabled(False)
00191 self.plot_widget_.btn_prev.setEnabled(False)
00192 self.plot_widget_.btn_next.setEnabled(False)
00193
00194 self._widget.btn_refresh_nodes.pressed.connect(self.on_btn_refresh_nodes_clicked_)
00195 self._widget.btn_test.pressed.connect(self.on_btn_test_clicked_)
00196 self._widget.btn_save.pressed.connect(self.on_btn_save_clicked_)
00197
00198 self.plot_widget_.btn_next.pressed.connect(self.on_btn_next_clicked_)
00199 self.plot_widget_.btn_prev.pressed.connect(self.on_btn_prev_clicked_)
00200
00201 self._widget.nodes_combo.currentIndexChanged.connect(self.new_node_selected_)
00202
00203 self.on_btn_refresh_nodes_clicked_()
00204
00205
00206 def on_btn_save_clicked_(self):
00207 """
00208 Save the tests in a tarball.
00209 """
00210 for test in self.test_threads:
00211 test.save()
00212
00213
00214 path = "/tmp/self_tests.tar.gz"
00215 if os.path.isfile(path):
00216 shutil.copy(path, path+".bk")
00217 os.remove(path)
00218
00219
00220 tarball = tarfile.open(path, "w:gz")
00221 tarball.add("/tmp/self_tests")
00222 tarball.close()
00223
00224 QMessageBox.warning(self._widget, "Information", "A tarball was saved in "+path+", please email it to hand@shadowrobot.com.")
00225
00226 def on_btn_test_clicked_(self):
00227 """
00228 Run the tests in separate threads (in parallel)
00229 """
00230
00231 self.test_threads = []
00232
00233
00234 self._widget.btn_test.setEnabled(False)
00235 self._widget.btn_save.setEnabled(False)
00236 root_item = self.test_widget_.test_tree.invisibleRootItem()
00237 for i in range( root_item.childCount() ):
00238 item = root_item.child(i)
00239 item.setExpanded(False)
00240
00241 self._widget.setCursor(Qt.WaitCursor)
00242
00243 self._widget.progress_bar.reset()
00244
00245
00246 self.test_threads = []
00247
00248 nodes_to_test = []
00249 if self.selected_node_ == "All":
00250 nodes_to_test = self.nodes[1:]
00251 else:
00252 nodes_to_test = [self.selected_node_]
00253
00254 for n in nodes_to_test:
00255 self.test_threads.append(AsyncService(self._widget, n, len(self.test_threads)))
00256 self._widget.connect(self.test_threads[-1], SIGNAL("test_finished(QPoint)"), self.on_test_finished_)
00257 self._widget.connect(self.test_threads[-1], SIGNAL("manual_test(QPoint)"), self.on_manual_test_)
00258
00259 for thread in self.test_threads:
00260 thread.start()
00261
00262 def on_test_finished_(self, point):
00263 """
00264 Callback from test_finished signal. Display the results and update the progress
00265 """
00266 thread = self.test_threads[point.x()]
00267 resp = thread.resp
00268 node_item = None
00269 if resp.passed:
00270 node_item = QTreeWidgetItem(["OK", thread.node_name + " ["+str(resp.id)+"]"])
00271 node_item.setBackgroundColor(0, QColor(green))
00272 else:
00273 node_item = QTreeWidgetItem(["FAILED", thread.node_name + " ["+str(resp.id)+"]"])
00274 node_item.setBackgroundColor(0, QColor(red))
00275 self.test_widget_.test_tree.addTopLevelItem(node_item)
00276
00277
00278 for status in resp.status:
00279 display = ["", "", "", status.name, status.message]
00280 color = None
00281 if status.level == status.OK:
00282 display[2] = "OK"
00283 color = QColor(green)
00284 elif status.level == status.WARN:
00285 display[2] = "WARN"
00286 color = QColor(orange)
00287 else:
00288 display[2] = "ERROR"
00289 color = QColor(red)
00290 st_item = QTreeWidgetItem(node_item, display)
00291 st_item.setBackgroundColor(2, color)
00292 self.test_widget_.test_tree.addTopLevelItem(st_item)
00293 st_item.setExpanded(True)
00294 node_item.setExpanded(True)
00295
00296
00297 self.display_plots_(thread.node_name)
00298
00299 for col in range(0, self.test_widget_.test_tree.columnCount()):
00300 self.test_widget_.test_tree.resizeColumnToContents(col)
00301
00302
00303 nb_threads_finished = 0
00304 for thread in self.test_threads:
00305 if thread.resp is not None:
00306 nb_threads_finished += 1
00307 thread.shutdown()
00308
00309 percentage = 100.0 * nb_threads_finished / len(self.test_threads)
00310 self._widget.progress_bar.setValue( percentage )
00311
00312 if percentage == 100.0:
00313 QMessageBox.information(self._widget, "Information", "If the tests passed, the communications\n with the hand are fine.")
00314
00315
00316 self._widget.btn_test.setEnabled(True)
00317
00318 self._widget.setCursor(Qt.ArrowCursor)
00319 self._widget.btn_save.setEnabled(True)
00320
00321 def on_manual_test_(self, point):
00322 thread = self.test_threads[point.x()]
00323
00324 input_dialog = QInputDialog(self._widget)
00325 input_dialog.setOkButtonText("OK - Test successful." )
00326 input_dialog.setCancelButtonText("NO - Test failed (please enter a comment to say why the test fail above).")
00327 input_dialog.setLabelText(thread.manual_test_req_.message)
00328 input_dialog.setWindowTitle( thread.node_name + " - Manual Test")
00329
00330 ok = (QDialog.Accepted == input_dialog.exec_())
00331
00332 thread.manual_test_res_ = ManualSelfTestResponse(ok, input_dialog.textValue())
00333
00334 def display_plots_(self, display_node):
00335 """
00336 Loads the plots available in /tmp/self_tests/node (place where the sr_self_test saves
00337 the plots for the fingers movements)
00338 """
00339 self.list_of_pics = []
00340 self.list_of_pics_tests = []
00341 for root, dirs, files in os.walk("/tmp/self_tests/"):
00342 for f in files:
00343 node_name = root.split("/")[-1]
00344 if node_name == display_node:
00345 if ".png" in f:
00346 self.list_of_pics.append(os.path.join(root,f))
00347 self.list_of_pics_tests.append(node_name)
00348
00349 self.list_of_pics.sort()
00350 self.index_picture = 0
00351 self.refresh_pic_()
00352
00353 def refresh_pic_(self):
00354 """
00355 Refresh the pic being displayed
00356 """
00357 if len(self.list_of_pics) > 0:
00358 self.plot_widget_.label_node.setText(self.list_of_pics_tests[self.index_picture] + " ["+str(self.index_picture+1)+"/"+str(len(self.list_of_pics))+"]")
00359
00360 self.resizeable_plot.plot_pixmap = QPixmap(self.list_of_pics[self.index_picture])
00361 self.plot_widget_.btn_prev.setEnabled(True)
00362 self.plot_widget_.btn_next.setEnabled(True)
00363 else:
00364 self.plot_widget_.btn_prev.setEnabled(False)
00365 self.plot_widget_.btn_next.setEnabled(False)
00366
00367
00368 self.resizeable_plot.resizeEvent(QResizeEvent(self.resizeable_plot.size(), self.resizeable_plot.size()))
00369
00370 def on_btn_next_clicked_(self):
00371 """
00372 Next pic
00373 """
00374 self.index_picture = min(self.index_picture + 1, len(self.list_of_pics) - 1)
00375 self.refresh_pic_()
00376
00377 def on_btn_prev_clicked_(self):
00378 """
00379 Prev pic
00380 """
00381 self.index_picture = max(self.index_picture - 1, 0)
00382 self.refresh_pic_()
00383
00384 def on_btn_refresh_nodes_clicked_(self):
00385 """
00386 Refresh the list of nodes (check which node is advertising a self_test service)
00387 """
00388 self.nodes = []
00389
00390
00391 self.nodes = [x[0].split('/')[len(x[0].split('/'))-2] for x in rosgraph.masterapi.Master("/").getSystemState()[2] if "self_test" in x[0]]
00392
00393
00394 self.nodes = list(set(self.nodes))
00395
00396 self.nodes.insert(0, "All")
00397
00398 self._widget.nodes_combo.clear()
00399 for node in self.nodes:
00400 self._widget.nodes_combo.addItem(node)
00401
00402 def new_node_selected_(self, index=None):
00403 """
00404 Callback for node selection dropdown
00405 """
00406 self.selected_node_ = self.nodes[index]
00407
00408 if index != None:
00409 self._widget.btn_test.setEnabled(True)
00410 else:
00411 self._widget.btn_test.setEnabled(False)
00412
00413 def _unregisterPublisher(self):
00414 if self._publisher is not None:
00415 self._publisher.unregister()
00416 self._publisher = None
00417
00418 def shutdown_plugin(self):
00419 self._unregisterPublisher()
00420
00421 def save_settings(self, global_settings, perspective_settings):
00422 pass
00423
00424 def restore_settings(self, global_settings, perspective_settings):
00425 pass