self_test.py
Go to the documentation of this file.
00001 # Copyright (c) 2011, Dirk Thomas, TU Darmstadt
00002 # All rights reserved.
00003 #
00004 # Redistribution and use in source and binary forms, with or without
00005 # modification, are permitted provided that the following conditions
00006 # are met:
00007 #
00008 #   * Redistributions of source code must retain the above copyright
00009 #     notice, this list of conditions and the following disclaimer.
00010 #   * Redistributions in binary form must reproduce the above
00011 #     copyright notice, this list of conditions and the following
00012 #     disclaimer in the documentation and/or other materials provided
00013 #     with the distribution.
00014 #   * Neither the name of the TU Darmstadt nor the names of its
00015 #     contributors may be used to endorse or promote products derived
00016 #     from this software without specific prior written permission.
00017 #
00018 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00019 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00020 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00021 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00022 # COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00023 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00024 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00025 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00026 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00027 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00028 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00029 # POSSIBILITY OF SUCH DAMAGE.
00030 
00031 
00032 import os
00033 import tarfile
00034 import shutil
00035 import rospy
00036 import rosgraph
00037 import rospkg
00038 import time
00039 
00040 from qt_gui.plugin import Plugin
00041 from python_qt_binding import loadUi
00042 
00043 from sr_robot_msgs.srv import ManualSelfTest, ManualSelfTestResponse
00044 from diagnostic_msgs.srv import SelfTest
00045 
00046 from QtGui import QWidget, QTreeWidgetItem, QColor, QPixmap, QMessageBox
00047 from QtGui import QInputDialog, QDialog, QSplitter, QLabel, QSizePolicy, QResizeEvent
00048 from QtCore import QThread, SIGNAL, QPoint
00049 from QtCore import Qt
00050 
00051 from time import sleep
00052 
00053 green = QColor(153, 231, 96)
00054 orange = QColor(247, 206, 134)
00055 red = QColor(236, 178, 178)
00056 
00057 
00058 class AsyncService(QThread):
00059 
00060     def __init__(self, widget, node_name, index, path_to_data):
00061         """
00062         Calling the self test services asynchronously
00063         so that it doesn't "kill" the GUI while they run.
00064         (also runs all test services in parallel -> faster).
00065 
00066         @widget: parent widget
00067         @node_name: name of the node for which we're running the self_test
00068         @index: index of this thread in the list of threads (to find out which thread finished in callback)
00069         """
00070         QThread.__init__(self, widget)
00071         self._widget = widget
00072         self.node_name = node_name
00073         self.service_name = node_name + "/self_test"
00074         self.index = index
00075 
00076         self.manual_test_req_ = None
00077         self.manual_test_res_ = None
00078         self.resp = None
00079         self.path_to_data = path_to_data
00080 
00081         self.waiting_for_manual_test_ = False
00082 
00083         self.srv_manual_test_ = rospy.Service(
00084             node_name + "/manual_self_tests", ManualSelfTest, self.manual_test_srv_cb_)
00085 
00086     def manual_test_srv_cb_(self, req):
00087         while self.waiting_for_manual_test_:
00088             sleep(0.1)
00089         self.waiting_for_manual_test_ = True
00090         self.manual_test_res_ = None
00091 
00092         self.manual_test_req_ = req
00093 
00094         self.emit(SIGNAL("manual_test(QPoint)"), QPoint(self.index, 0))
00095 
00096         while self.manual_test_res_ is None:
00097             time.sleep(0.01)
00098 
00099         self.waiting_for_manual_test_ = False
00100         return self.manual_test_res_
00101 
00102     def run(self):
00103         """
00104         Calls the node/self_test service and emits a signal once it's finished running.
00105         """
00106         self_test_srv = rospy.ServiceProxy(self.service_name, SelfTest)
00107         try:
00108             self.resp = self_test_srv()
00109         except rospy.ServiceException, e:
00110             rospy.logerr(
00111                 "Failed to called " + self.service_name + " %s" % str(e))
00112         if self.resp is None:
00113             rospy.logerr(
00114                 "Failed to called " + self.service_name + " %s" % str(e))
00115             return
00116 
00117         self.emit(SIGNAL("test_finished(QPoint)"), QPoint(self.index, 0))
00118 
00119     def save(self):
00120         """
00121         Save the test results in a file at $HOME/.ros/log/self_tests/node/results.txt
00122         """
00123         if self.resp is not None:
00124             path = self.path_to_data + "self_tests/" + self.node_name
00125             if not os.path.exists(path):
00126                 os.makedirs(path)
00127             f = open(path + "/results.txt", "w")
00128             f.write(str(self.resp))
00129             f.close()
00130         else:
00131             rospy.logerr(
00132                 "Test for " + self.node_name + " can't be saved: no results found.")
00133 
00134     def shutdown(self):
00135         self.manual_test_res_ = None
00136         self.manual_test_req_ = None
00137         self.srv_manual_test_.shutdown()
00138         self.srv_manual_test_ = None
00139 
00140 
00141 class ResizeableQPlot(QLabel):
00142 
00143     def __init__(self, parent):
00144         QLabel.__init__(self, parent)
00145         self.plot_pixmap = None
00146 
00147     def resizeEvent(self, event):
00148         if self.plot_pixmap is not None:
00149             pixmap = self.plot_pixmap
00150             size = event.size()
00151             self.setPixmap(
00152                 pixmap.scaled(size, Qt.KeepAspectRatio, Qt.SmoothTransformation))
00153 
00154 
00155 class SrGuiSelfTest(Plugin):
00156 
00157     """
00158     A rosgui plugin for running self diagnostics of a Shadow robot hand
00159     """
00160 
00161     def __init__(self, context):
00162         """
00163         Detects which nodes are advertising a self_test service, makes it possible to run them,
00164         and display the results.
00165         """
00166         super(SrGuiSelfTest, self).__init__(context)
00167         self.setObjectName('SrGuiSelfTest')
00168 
00169         self._publisher = None
00170         self._widget = QWidget()
00171 
00172         # the UI is split into 3 files
00173         ui_path = os.path.join(
00174             rospkg.RosPack().get_path('sr_gui_self_test'), 'uis')
00175         ui_file = os.path.join(ui_path, 'SrSelfTest.ui')
00176         loadUi(ui_file, self._widget)
00177         self._widget.setObjectName('SrSelfTestUi')
00178         context.add_widget(self._widget)
00179 
00180         self.test_widget_ = QWidget()
00181         ui_file = os.path.join(ui_path, 'self_test.ui')
00182         loadUi(ui_file, self.test_widget_)
00183         self.plot_widget_ = QWidget()
00184         ui_file = os.path.join(ui_path, 'test_plot.ui')
00185         loadUi(ui_file, self.plot_widget_)
00186 
00187         # set the path where data are saved
00188         self.path_to_data = os.path.expanduser('~')
00189         if self.path_to_data == "":
00190             self.path_to_data = "/tmp/"
00191         else:
00192             self.path_to_data += "/.ros/log/"
00193         # we load both the test and plot widget in the mdi area
00194         self.splitter_ = QSplitter(Qt.Vertical, self._widget)
00195         self._widget.test_layout.addWidget(self.splitter_)
00196         self.splitter_.addWidget(self.test_widget_)
00197         self.splitter_.addWidget(self.plot_widget_)
00198         self.test_widget_.show()
00199         self.plot_widget_.show()
00200 
00201         self.nodes = None
00202         self.selected_node_ = None
00203         self.test_threads = []
00204 
00205         self.index_picture = 0
00206         self.list_of_pics = []
00207         self.list_of_pics_tests = []
00208 
00209         # add plot widget
00210         self.resizeable_plot = ResizeableQPlot(self.plot_widget_)
00211         self.resizeable_plot.setSizePolicy(
00212             QSizePolicy.Ignored, QSizePolicy.Ignored)
00213         self.resizeable_plot.setAlignment(Qt.AlignCenter)
00214         self.plot_widget_.plot_layout.addWidget(self.resizeable_plot)
00215 
00216         self._widget.btn_test.setEnabled(False)
00217         self._widget.btn_save.setEnabled(False)
00218         self.plot_widget_.btn_prev.setEnabled(False)
00219         self.plot_widget_.btn_next.setEnabled(False)
00220 
00221         self._widget.btn_refresh_nodes.pressed.connect(
00222             self.on_btn_refresh_nodes_clicked_)
00223         self._widget.btn_test.pressed.connect(self.on_btn_test_clicked_)
00224         self._widget.btn_save.pressed.connect(self.on_btn_save_clicked_)
00225 
00226         self.plot_widget_.btn_next.pressed.connect(self.on_btn_next_clicked_)
00227         self.plot_widget_.btn_prev.pressed.connect(self.on_btn_prev_clicked_)
00228 
00229         self._widget.nodes_combo.currentIndexChanged.connect(
00230             self.new_node_selected_)
00231 
00232         self.on_btn_refresh_nodes_clicked_()
00233 
00234     def on_btn_save_clicked_(self):
00235         """
00236         Save the tests in a tarball.
00237         """
00238         for test in self.test_threads:
00239             test.save()
00240 
00241         # backup previous test results if they exist
00242         path = self.path_to_data + "self_tests.tar.gz"
00243         if os.path.isfile(path):
00244             shutil.copy(path, path + ".bk")
00245             os.remove(path)
00246 
00247         # create the tarball and save everything in it.
00248         tarball = tarfile.open(path, "w:gz")
00249         tarball.add(self.path_to_data + "self_tests")
00250         tarball.close()
00251 
00252         QMessageBox.warning(self._widget, "Information",
00253                             "A tarball was saved in " + path + ", please email it to hand@shadowrobot.com.")
00254 
00255     def on_btn_test_clicked_(self):
00256         """
00257         Run the tests in separate threads (in parallel)
00258         """
00259         # empty thread list
00260         self.test_threads = []
00261 
00262         # disable btn, fold previous tests and reset progress bar
00263         self._widget.btn_test.setEnabled(False)
00264         self._widget.btn_save.setEnabled(False)
00265         root_item = self.test_widget_.test_tree.invisibleRootItem()
00266         for i in range(root_item.childCount()):
00267             item = root_item.child(i)
00268             item.setExpanded(False)
00269         # also change cursor to "wait"
00270         self._widget.setCursor(Qt.WaitCursor)
00271 
00272         self._widget.progress_bar.reset()
00273 
00274         # delete previous results
00275         self.test_threads = []
00276 
00277         nodes_to_test = []
00278         if self.selected_node_ == "All":
00279             nodes_to_test = self.nodes[1:]
00280         else:
00281             nodes_to_test = [self.selected_node_]
00282 
00283         for n in nodes_to_test:
00284             self.test_threads.append(
00285                 AsyncService(self._widget, n, len(self.test_threads), self.path_to_data))
00286             self._widget.connect(self.test_threads[-1], SIGNAL(
00287                 "test_finished(QPoint)"), self.on_test_finished_)
00288             self._widget.connect(self.test_threads[-1], SIGNAL(
00289                 "manual_test(QPoint)"), self.on_manual_test_)
00290 
00291         for thread in self.test_threads:
00292             thread.start()
00293 
00294     def on_test_finished_(self, point):
00295         """
00296         Callback from test_finished signal. Display the results and update the progress
00297         """
00298         thread = self.test_threads[point.x()]
00299         resp = thread.resp
00300         node_item = None
00301         if resp.passed:
00302             node_item = QTreeWidgetItem(
00303                 ["OK", thread.node_name + " [" + str(resp.id) + "]"])
00304             node_item.setBackgroundColor(0, QColor(green))
00305         else:
00306             node_item = QTreeWidgetItem(
00307                 ["FAILED", thread.node_name + " [" + str(resp.id) + "]"])
00308             node_item.setBackgroundColor(0, QColor(red))
00309         self.test_widget_.test_tree.addTopLevelItem(node_item)
00310 
00311         # also display statuses
00312         for status in resp.status:
00313             display = ["", "", "", status.name, status.message]
00314             color = None
00315             if status.level == status.OK:
00316                 display[2] = "OK"
00317                 color = QColor(green)
00318             elif status.level == status.WARN:
00319                 display[2] = "WARN"
00320                 color = QColor(orange)
00321             else:
00322                 display[2] = "ERROR"
00323                 color = QColor(red)
00324             st_item = QTreeWidgetItem(node_item, display)
00325             st_item.setBackgroundColor(2, color)
00326             self.test_widget_.test_tree.addTopLevelItem(st_item)
00327             st_item.setExpanded(True)
00328         node_item.setExpanded(True)
00329 
00330         # display the plots if available
00331         self.display_plots_(thread.node_name)
00332 
00333         for col in range(0, self.test_widget_.test_tree.columnCount()):
00334             self.test_widget_.test_tree.resizeColumnToContents(col)
00335 
00336         # display progress advancement
00337         nb_threads_finished = 0
00338         for thread in self.test_threads:
00339             if thread.resp is not None:
00340                 nb_threads_finished += 1
00341                 thread.shutdown()
00342 
00343         percentage = 100.0 * nb_threads_finished / len(self.test_threads)
00344         self._widget.progress_bar.setValue(percentage)
00345 
00346         if percentage == 100.0:
00347             QMessageBox.information(
00348                 self._widget, "Information", "If the tests passed, the communications\n with the hand are fine.")
00349 
00350             # all tests were run, reenable button
00351             self._widget.btn_test.setEnabled(True)
00352             # also change cursor to standard arrow
00353             self._widget.setCursor(Qt.ArrowCursor)
00354             self._widget.btn_save.setEnabled(True)
00355 
00356     def on_manual_test_(self, point):
00357         thread = self.test_threads[point.x()]
00358 
00359         input_dialog = QInputDialog(self._widget)
00360         input_dialog.setOkButtonText("OK - Test successful.")
00361         input_dialog.setCancelButtonText(
00362             "NO - Test failed (please enter a comment to say why the test fail above).")
00363         input_dialog.setLabelText(thread.manual_test_req_.message)
00364         input_dialog.setWindowTitle(thread.node_name + " - Manual Test")
00365 
00366         ok = (QDialog.Accepted == input_dialog.exec_())
00367 
00368         thread.manual_test_res_ = ManualSelfTestResponse(
00369             ok, input_dialog.textValue())
00370 
00371     def display_plots_(self, display_node):
00372         """
00373         Loads the plots available in $HOME/.ros/log/self_tests/node (place where the sr_self_test saves
00374         the plots for the fingers movements)
00375         """
00376         self.list_of_pics = []
00377         self.list_of_pics_tests = []
00378         for root, dirs, files in os.walk(self.path_to_data + "self_tests/"):
00379             for f in files:
00380                 node_name = root.split("/")[-1]
00381                 if node_name == display_node:
00382                     if ".png" in f:
00383                         self.list_of_pics.append(os.path.join(root, f))
00384                         self.list_of_pics_tests.append(node_name)
00385 
00386         self.list_of_pics.sort()
00387         self.index_picture = 0
00388         self.refresh_pic_()
00389 
00390     def refresh_pic_(self):
00391         """
00392         Refresh the pic being displayed
00393         """
00394         if len(self.list_of_pics) > 0:
00395             self.plot_widget_.label_node.setText(self.list_of_pics_tests[self.index_picture] +
00396                                                  " [" + str(self.index_picture + 1) + "/" +
00397                                                  str(len(self.list_of_pics)) + "]")
00398 
00399             self.resizeable_plot.plot_pixmap = QPixmap(
00400                 self.list_of_pics[self.index_picture])
00401             self.plot_widget_.btn_prev.setEnabled(True)
00402             self.plot_widget_.btn_next.setEnabled(True)
00403         else:
00404             self.plot_widget_.btn_prev.setEnabled(False)
00405             self.plot_widget_.btn_next.setEnabled(False)
00406 
00407         # dummy resize event to get the plot to be drawn
00408         self.resizeable_plot.resizeEvent(
00409             QResizeEvent(self.resizeable_plot.size(), self.resizeable_plot.size()))
00410 
00411     def on_btn_next_clicked_(self):
00412         """
00413         Next pic
00414         """
00415         self.index_picture = min(
00416             self.index_picture + 1, len(self.list_of_pics) - 1)
00417         self.refresh_pic_()
00418 
00419     def on_btn_prev_clicked_(self):
00420         """
00421         Prev pic
00422         """
00423         self.index_picture = max(self.index_picture - 1, 0)
00424         self.refresh_pic_()
00425 
00426     def on_btn_refresh_nodes_clicked_(self):
00427         """
00428         Refresh the list of nodes (check which node is advertising a self_test service)
00429         """
00430         self.nodes = []
00431 
00432         # gets all the list of services and only keep the nodes which have a
00433         # self_test service
00434         self.nodes = [x[0].split('/')[len(x[0].split('/')) - 2]
00435                       for x in rosgraph.masterapi.Master("/").getSystemState()[2] if "self_test" in x[0]]
00436 
00437         # uniquify items in the list
00438         self.nodes = list(set(self.nodes))
00439 
00440         self.nodes.insert(0, "All")
00441 
00442         self._widget.nodes_combo.clear()
00443         for node in self.nodes:
00444             self._widget.nodes_combo.addItem(node)
00445 
00446     def new_node_selected_(self, index=None):
00447         """
00448         Callback for node selection dropdown
00449         """
00450         self.selected_node_ = self.nodes[index]
00451 
00452         if index is not None:
00453             self._widget.btn_test.setEnabled(True)
00454         else:
00455             self._widget.btn_test.setEnabled(False)
00456 
00457     def _unregisterPublisher(self):
00458         if self._publisher is not None:
00459             self._publisher.unregister()
00460             self._publisher = None
00461 
00462     def shutdown_plugin(self):
00463         self._unregisterPublisher()
00464 
00465     def save_settings(self, global_settings, perspective_settings):
00466         pass
00467 
00468     def restore_settings(self, global_settings, perspective_settings):
00469         pass


sr_gui_self_test
Author(s): Ugo Cupcic
autogenerated on Thu Jun 6 2019 21:14:00