00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 import os
00033 import tarfile
00034 import shutil
00035 import rospy
00036 import rosgraph
00037 import rospkg
00038 import time
00039
00040 from qt_gui.plugin import Plugin
00041 from python_qt_binding import loadUi
00042
00043 from sr_robot_msgs.srv import ManualSelfTest, ManualSelfTestResponse
00044 from diagnostic_msgs.srv import SelfTest
00045
00046 from QtGui import QWidget, QTreeWidgetItem, QColor, QPixmap, QMessageBox
00047 from QtGui import QInputDialog, QDialog, QSplitter, QLabel, QSizePolicy, QResizeEvent
00048 from QtCore import QThread, SIGNAL, QPoint
00049 from QtCore import Qt
00050
00051 from time import sleep
00052
00053 green = QColor(153, 231, 96)
00054 orange = QColor(247, 206, 134)
00055 red = QColor(236, 178, 178)
00056
00057
00058 class AsyncService(QThread):
00059
00060 def __init__(self, widget, node_name, index, path_to_data):
00061 """
00062 Calling the self test services asynchronously
00063 so that it doesn't "kill" the GUI while they run.
00064 (also runs all test services in parallel -> faster).
00065
00066 @widget: parent widget
00067 @node_name: name of the node for which we're running the self_test
00068 @index: index of this thread in the list of threads (to find out which thread finished in callback)
00069 """
00070 QThread.__init__(self, widget)
00071 self._widget = widget
00072 self.node_name = node_name
00073 self.service_name = node_name + "/self_test"
00074 self.index = index
00075
00076 self.manual_test_req_ = None
00077 self.manual_test_res_ = None
00078 self.resp = None
00079 self.path_to_data = path_to_data
00080
00081 self.waiting_for_manual_test_ = False
00082
00083 self.srv_manual_test_ = rospy.Service(
00084 node_name + "/manual_self_tests", ManualSelfTest, self.manual_test_srv_cb_)
00085
00086 def manual_test_srv_cb_(self, req):
00087 while self.waiting_for_manual_test_:
00088 sleep(0.1)
00089 self.waiting_for_manual_test_ = True
00090 self.manual_test_res_ = None
00091
00092 self.manual_test_req_ = req
00093
00094 self.emit(SIGNAL("manual_test(QPoint)"), QPoint(self.index, 0))
00095
00096 while self.manual_test_res_ is None:
00097 time.sleep(0.01)
00098
00099 self.waiting_for_manual_test_ = False
00100 return self.manual_test_res_
00101
00102 def run(self):
00103 """
00104 Calls the node/self_test service and emits a signal once it's finished running.
00105 """
00106 self_test_srv = rospy.ServiceProxy(self.service_name, SelfTest)
00107 try:
00108 self.resp = self_test_srv()
00109 except rospy.ServiceException, e:
00110 rospy.logerr(
00111 "Failed to called " + self.service_name + " %s" % str(e))
00112 if self.resp is None:
00113 rospy.logerr(
00114 "Failed to called " + self.service_name + " %s" % str(e))
00115 return
00116
00117 self.emit(SIGNAL("test_finished(QPoint)"), QPoint(self.index, 0))
00118
00119 def save(self):
00120 """
00121 Save the test results in a file at $HOME/.ros/log/self_tests/node/results.txt
00122 """
00123 if self.resp is not None:
00124 path = self.path_to_data + "self_tests/" + self.node_name
00125 if not os.path.exists(path):
00126 os.makedirs(path)
00127 f = open(path + "/results.txt", "w")
00128 f.write(str(self.resp))
00129 f.close()
00130 else:
00131 rospy.logerr(
00132 "Test for " + self.node_name + " can't be saved: no results found.")
00133
00134 def shutdown(self):
00135 self.manual_test_res_ = None
00136 self.manual_test_req_ = None
00137 self.srv_manual_test_.shutdown()
00138 self.srv_manual_test_ = None
00139
00140
00141 class ResizeableQPlot(QLabel):
00142
00143 def __init__(self, parent):
00144 QLabel.__init__(self, parent)
00145 self.plot_pixmap = None
00146
00147 def resizeEvent(self, event):
00148 if self.plot_pixmap is not None:
00149 pixmap = self.plot_pixmap
00150 size = event.size()
00151 self.setPixmap(
00152 pixmap.scaled(size, Qt.KeepAspectRatio, Qt.SmoothTransformation))
00153
00154
00155 class SrGuiSelfTest(Plugin):
00156
00157 """
00158 A rosgui plugin for running self diagnostics of a Shadow robot hand
00159 """
00160
00161 def __init__(self, context):
00162 """
00163 Detects which nodes are advertising a self_test service, makes it possible to run them,
00164 and display the results.
00165 """
00166 super(SrGuiSelfTest, self).__init__(context)
00167 self.setObjectName('SrGuiSelfTest')
00168
00169 self._publisher = None
00170 self._widget = QWidget()
00171
00172
00173 ui_path = os.path.join(
00174 rospkg.RosPack().get_path('sr_gui_self_test'), 'uis')
00175 ui_file = os.path.join(ui_path, 'SrSelfTest.ui')
00176 loadUi(ui_file, self._widget)
00177 self._widget.setObjectName('SrSelfTestUi')
00178 context.add_widget(self._widget)
00179
00180 self.test_widget_ = QWidget()
00181 ui_file = os.path.join(ui_path, 'self_test.ui')
00182 loadUi(ui_file, self.test_widget_)
00183 self.plot_widget_ = QWidget()
00184 ui_file = os.path.join(ui_path, 'test_plot.ui')
00185 loadUi(ui_file, self.plot_widget_)
00186
00187
00188 self.path_to_data = os.path.expanduser('~')
00189 if self.path_to_data == "":
00190 self.path_to_data = "/tmp/"
00191 else:
00192 self.path_to_data += "/.ros/log/"
00193
00194 self.splitter_ = QSplitter(Qt.Vertical, self._widget)
00195 self._widget.test_layout.addWidget(self.splitter_)
00196 self.splitter_.addWidget(self.test_widget_)
00197 self.splitter_.addWidget(self.plot_widget_)
00198 self.test_widget_.show()
00199 self.plot_widget_.show()
00200
00201 self.nodes = None
00202 self.selected_node_ = None
00203 self.test_threads = []
00204
00205 self.index_picture = 0
00206 self.list_of_pics = []
00207 self.list_of_pics_tests = []
00208
00209
00210 self.resizeable_plot = ResizeableQPlot(self.plot_widget_)
00211 self.resizeable_plot.setSizePolicy(
00212 QSizePolicy.Ignored, QSizePolicy.Ignored)
00213 self.resizeable_plot.setAlignment(Qt.AlignCenter)
00214 self.plot_widget_.plot_layout.addWidget(self.resizeable_plot)
00215
00216 self._widget.btn_test.setEnabled(False)
00217 self._widget.btn_save.setEnabled(False)
00218 self.plot_widget_.btn_prev.setEnabled(False)
00219 self.plot_widget_.btn_next.setEnabled(False)
00220
00221 self._widget.btn_refresh_nodes.pressed.connect(
00222 self.on_btn_refresh_nodes_clicked_)
00223 self._widget.btn_test.pressed.connect(self.on_btn_test_clicked_)
00224 self._widget.btn_save.pressed.connect(self.on_btn_save_clicked_)
00225
00226 self.plot_widget_.btn_next.pressed.connect(self.on_btn_next_clicked_)
00227 self.plot_widget_.btn_prev.pressed.connect(self.on_btn_prev_clicked_)
00228
00229 self._widget.nodes_combo.currentIndexChanged.connect(
00230 self.new_node_selected_)
00231
00232 self.on_btn_refresh_nodes_clicked_()
00233
00234 def on_btn_save_clicked_(self):
00235 """
00236 Save the tests in a tarball.
00237 """
00238 for test in self.test_threads:
00239 test.save()
00240
00241
00242 path = self.path_to_data + "self_tests.tar.gz"
00243 if os.path.isfile(path):
00244 shutil.copy(path, path + ".bk")
00245 os.remove(path)
00246
00247
00248 tarball = tarfile.open(path, "w:gz")
00249 tarball.add(self.path_to_data + "self_tests")
00250 tarball.close()
00251
00252 QMessageBox.warning(self._widget, "Information",
00253 "A tarball was saved in " + path + ", please email it to hand@shadowrobot.com.")
00254
00255 def on_btn_test_clicked_(self):
00256 """
00257 Run the tests in separate threads (in parallel)
00258 """
00259
00260 self.test_threads = []
00261
00262
00263 self._widget.btn_test.setEnabled(False)
00264 self._widget.btn_save.setEnabled(False)
00265 root_item = self.test_widget_.test_tree.invisibleRootItem()
00266 for i in range(root_item.childCount()):
00267 item = root_item.child(i)
00268 item.setExpanded(False)
00269
00270 self._widget.setCursor(Qt.WaitCursor)
00271
00272 self._widget.progress_bar.reset()
00273
00274
00275 self.test_threads = []
00276
00277 nodes_to_test = []
00278 if self.selected_node_ == "All":
00279 nodes_to_test = self.nodes[1:]
00280 else:
00281 nodes_to_test = [self.selected_node_]
00282
00283 for n in nodes_to_test:
00284 self.test_threads.append(
00285 AsyncService(self._widget, n, len(self.test_threads), self.path_to_data))
00286 self._widget.connect(self.test_threads[-1], SIGNAL(
00287 "test_finished(QPoint)"), self.on_test_finished_)
00288 self._widget.connect(self.test_threads[-1], SIGNAL(
00289 "manual_test(QPoint)"), self.on_manual_test_)
00290
00291 for thread in self.test_threads:
00292 thread.start()
00293
00294 def on_test_finished_(self, point):
00295 """
00296 Callback from test_finished signal. Display the results and update the progress
00297 """
00298 thread = self.test_threads[point.x()]
00299 resp = thread.resp
00300 node_item = None
00301 if resp.passed:
00302 node_item = QTreeWidgetItem(
00303 ["OK", thread.node_name + " [" + str(resp.id) + "]"])
00304 node_item.setBackgroundColor(0, QColor(green))
00305 else:
00306 node_item = QTreeWidgetItem(
00307 ["FAILED", thread.node_name + " [" + str(resp.id) + "]"])
00308 node_item.setBackgroundColor(0, QColor(red))
00309 self.test_widget_.test_tree.addTopLevelItem(node_item)
00310
00311
00312 for status in resp.status:
00313 display = ["", "", "", status.name, status.message]
00314 color = None
00315 if status.level == status.OK:
00316 display[2] = "OK"
00317 color = QColor(green)
00318 elif status.level == status.WARN:
00319 display[2] = "WARN"
00320 color = QColor(orange)
00321 else:
00322 display[2] = "ERROR"
00323 color = QColor(red)
00324 st_item = QTreeWidgetItem(node_item, display)
00325 st_item.setBackgroundColor(2, color)
00326 self.test_widget_.test_tree.addTopLevelItem(st_item)
00327 st_item.setExpanded(True)
00328 node_item.setExpanded(True)
00329
00330
00331 self.display_plots_(thread.node_name)
00332
00333 for col in range(0, self.test_widget_.test_tree.columnCount()):
00334 self.test_widget_.test_tree.resizeColumnToContents(col)
00335
00336
00337 nb_threads_finished = 0
00338 for thread in self.test_threads:
00339 if thread.resp is not None:
00340 nb_threads_finished += 1
00341 thread.shutdown()
00342
00343 percentage = 100.0 * nb_threads_finished / len(self.test_threads)
00344 self._widget.progress_bar.setValue(percentage)
00345
00346 if percentage == 100.0:
00347 QMessageBox.information(
00348 self._widget, "Information", "If the tests passed, the communications\n with the hand are fine.")
00349
00350
00351 self._widget.btn_test.setEnabled(True)
00352
00353 self._widget.setCursor(Qt.ArrowCursor)
00354 self._widget.btn_save.setEnabled(True)
00355
00356 def on_manual_test_(self, point):
00357 thread = self.test_threads[point.x()]
00358
00359 input_dialog = QInputDialog(self._widget)
00360 input_dialog.setOkButtonText("OK - Test successful.")
00361 input_dialog.setCancelButtonText(
00362 "NO - Test failed (please enter a comment to say why the test fail above).")
00363 input_dialog.setLabelText(thread.manual_test_req_.message)
00364 input_dialog.setWindowTitle(thread.node_name + " - Manual Test")
00365
00366 ok = (QDialog.Accepted == input_dialog.exec_())
00367
00368 thread.manual_test_res_ = ManualSelfTestResponse(
00369 ok, input_dialog.textValue())
00370
00371 def display_plots_(self, display_node):
00372 """
00373 Loads the plots available in $HOME/.ros/log/self_tests/node (place where the sr_self_test saves
00374 the plots for the fingers movements)
00375 """
00376 self.list_of_pics = []
00377 self.list_of_pics_tests = []
00378 for root, dirs, files in os.walk(self.path_to_data + "self_tests/"):
00379 for f in files:
00380 node_name = root.split("/")[-1]
00381 if node_name == display_node:
00382 if ".png" in f:
00383 self.list_of_pics.append(os.path.join(root, f))
00384 self.list_of_pics_tests.append(node_name)
00385
00386 self.list_of_pics.sort()
00387 self.index_picture = 0
00388 self.refresh_pic_()
00389
00390 def refresh_pic_(self):
00391 """
00392 Refresh the pic being displayed
00393 """
00394 if len(self.list_of_pics) > 0:
00395 self.plot_widget_.label_node.setText(self.list_of_pics_tests[self.index_picture] +
00396 " [" + str(self.index_picture + 1) + "/" +
00397 str(len(self.list_of_pics)) + "]")
00398
00399 self.resizeable_plot.plot_pixmap = QPixmap(
00400 self.list_of_pics[self.index_picture])
00401 self.plot_widget_.btn_prev.setEnabled(True)
00402 self.plot_widget_.btn_next.setEnabled(True)
00403 else:
00404 self.plot_widget_.btn_prev.setEnabled(False)
00405 self.plot_widget_.btn_next.setEnabled(False)
00406
00407
00408 self.resizeable_plot.resizeEvent(
00409 QResizeEvent(self.resizeable_plot.size(), self.resizeable_plot.size()))
00410
00411 def on_btn_next_clicked_(self):
00412 """
00413 Next pic
00414 """
00415 self.index_picture = min(
00416 self.index_picture + 1, len(self.list_of_pics) - 1)
00417 self.refresh_pic_()
00418
00419 def on_btn_prev_clicked_(self):
00420 """
00421 Prev pic
00422 """
00423 self.index_picture = max(self.index_picture - 1, 0)
00424 self.refresh_pic_()
00425
00426 def on_btn_refresh_nodes_clicked_(self):
00427 """
00428 Refresh the list of nodes (check which node is advertising a self_test service)
00429 """
00430 self.nodes = []
00431
00432
00433
00434 self.nodes = [x[0].split('/')[len(x[0].split('/')) - 2]
00435 for x in rosgraph.masterapi.Master("/").getSystemState()[2] if "self_test" in x[0]]
00436
00437
00438 self.nodes = list(set(self.nodes))
00439
00440 self.nodes.insert(0, "All")
00441
00442 self._widget.nodes_combo.clear()
00443 for node in self.nodes:
00444 self._widget.nodes_combo.addItem(node)
00445
00446 def new_node_selected_(self, index=None):
00447 """
00448 Callback for node selection dropdown
00449 """
00450 self.selected_node_ = self.nodes[index]
00451
00452 if index is not None:
00453 self._widget.btn_test.setEnabled(True)
00454 else:
00455 self._widget.btn_test.setEnabled(False)
00456
00457 def _unregisterPublisher(self):
00458 if self._publisher is not None:
00459 self._publisher.unregister()
00460 self._publisher = None
00461
00462 def shutdown_plugin(self):
00463 self._unregisterPublisher()
00464
00465 def save_settings(self, global_settings, perspective_settings):
00466 pass
00467
00468 def restore_settings(self, global_settings, perspective_settings):
00469 pass