self_test.py
Go to the documentation of this file.
00001 # Copyright (c) 2011, Dirk Thomas, TU Darmstadt
00002 # All rights reserved.
00003 #
00004 # Redistribution and use in source and binary forms, with or without
00005 # modification, are permitted provided that the following conditions
00006 # are met:
00007 #
00008 #   * Redistributions of source code must retain the above copyright
00009 #     notice, this list of conditions and the following disclaimer.
00010 #   * Redistributions in binary form must reproduce the above
00011 #     copyright notice, this list of conditions and the following
00012 #     disclaimer in the documentation and/or other materials provided
00013 #     with the distribution.
00014 #   * Neither the name of the TU Darmstadt nor the names of its
00015 #     contributors may be used to endorse or promote products derived
00016 #     from this software without specific prior written permission.
00017 #
00018 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00019 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00020 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00021 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00022 # COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00023 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00024 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00025 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00026 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00027 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00028 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00029 # POSSIBILITY OF SUCH DAMAGE.
00030 
00031 
00032 import os, tarfile, shutil, rospy, rosgraph, rospkg, time
00033 
00034 from qt_gui.plugin import Plugin
00035 from python_qt_binding import loadUi
00036 
00037 from sr_robot_msgs.srv import ManualSelfTest, ManualSelfTestResponse
00038 from diagnostic_msgs.srv import SelfTest
00039 
00040 from QtGui import QWidget, QTreeWidgetItem, QColor, QPixmap, QMessageBox, QInputDialog, QDialog, QSplitter, QLabel, QSizePolicy, QResizeEvent
00041 from QtCore import QThread, SIGNAL, QPoint
00042 from QtCore import Qt
00043 
00044 from time import sleep
00045 
00046 green = QColor(153, 231, 96)
00047 orange = QColor(247, 206, 134)
00048 red = QColor(236, 178, 178)
00049 
00050 class AsyncService(QThread):
00051     def __init__(self, widget, node_name, index):
00052         """
00053         Calling the self test services asynchronously
00054         so that it doesn't "kill" the GUI while they run.
00055         (also runs all test services in parallel -> faster).
00056 
00057         @widget: parent widget
00058         @node_name: name of the node for which we're running the self_test
00059         @index: index of this thread in the list of threads (to find out which thread finished in callback)
00060         """
00061         QThread.__init__(self, widget)
00062         self._widget = widget
00063         self.node_name = node_name
00064         self.service_name = node_name+"/self_test"
00065         self.index = index
00066 
00067         self.manual_test_req_ = None
00068         self.manual_test_res_ = None
00069         self.resp = None
00070 
00071         self.waiting_for_manual_test_ = False
00072 
00073         self.srv_manual_test_ = rospy.Service(node_name+"/manual_self_tests", ManualSelfTest, self.manual_test_srv_cb_)
00074 
00075     def manual_test_srv_cb_(self, req):
00076         while self.waiting_for_manual_test_:
00077             sleep(0.1)
00078         self.waiting_for_manual_test_ = True
00079         self.manual_test_res_ = None
00080 
00081         self.manual_test_req_ = req
00082 
00083         self.emit(SIGNAL("manual_test(QPoint)"), QPoint( self.index, 0))
00084 
00085         while self.manual_test_res_ == None:
00086             time.sleep(0.01)
00087 
00088         self.waiting_for_manual_test_ = False
00089         return self.manual_test_res_
00090 
00091     def run(self):
00092         """
00093         Calls the node/self_test service and emits a signal once it's finished running.
00094         """
00095         self_test_srv = rospy.ServiceProxy(self.service_name, SelfTest)
00096         try:
00097             self.resp = self_test_srv()
00098         except rospy.ServiceException, e:
00099             rospy.logerr("Failed to called " + self.service_name+" %s"%str(e))
00100         if self.resp == None:
00101             rospy.logerr("Failed to called " + self.service_name+" %s"%str(e))
00102             return
00103 
00104         self.emit(SIGNAL("test_finished(QPoint)"), QPoint( self.index, 0))
00105 
00106     def save(self):
00107         """
00108         Save the test results in a file at /tmp/self_tests/node/results.txt
00109         """
00110         if self.resp != None:
00111             path = "/tmp/self_tests/"+self.node_name
00112             if not os.path.exists(path):
00113                 os.makedirs(path)
00114             f = open(path+"/results.txt", "w")
00115             f.write( str(self.resp) )
00116             f.close()
00117         else:
00118             rospy.logerr("Test for "+self.node_name+" can't be saved: no results found.")
00119 
00120     def shutdown(self):
00121         self.manual_test_res_ = None
00122         self.manual_test_req_ = None
00123         self.srv_manual_test_.shutdown()
00124         self.srv_manual_test_ = None
00125 
00126 class ResizeableQPlot(QLabel):
00127     def __init__(self, parent):
00128         QLabel.__init__(self, parent)
00129         self.plot_pixmap = None
00130 
00131     def resizeEvent(self, event):
00132         if self.plot_pixmap != None:
00133             pixmap = self.plot_pixmap
00134             size = event.size()
00135             self.setPixmap(pixmap.scaled(size, Qt.KeepAspectRatio, Qt.SmoothTransformation))
00136 
00137 
00138 class SrGuiSelfTest(Plugin):
00139     """
00140     A rosgui plugin for running self diagnostics of a Shadow robot hand
00141     """
00142     def __init__(self, context):
00143         """
00144         Detects which nodes are advertising a self_test service, makes it possible to run them,
00145         and display the results.
00146         """
00147         super(SrGuiSelfTest, self).__init__(context)
00148         self.setObjectName('SrGuiSelfTest')
00149 
00150         self._publisher = None
00151         self._widget = QWidget()
00152 
00153         #the UI is split into 3 files
00154         ui_path = os.path.join(rospkg.RosPack().get_path('sr_gui_self_test'), 'uis')
00155         ui_file = os.path.join(ui_path, 'SrSelfTest.ui')
00156         loadUi(ui_file, self._widget)
00157         self._widget.setObjectName('SrSelfTestUi')
00158         context.add_widget(self._widget)
00159 
00160         self.test_widget_ = QWidget()
00161         ui_file = os.path.join(ui_path, 'self_test.ui')
00162         loadUi(ui_file, self.test_widget_)
00163         self.plot_widget_ = QWidget()
00164         ui_file = os.path.join(ui_path, 'test_plot.ui')
00165         loadUi(ui_file, self.plot_widget_)
00166 
00167         #we load both the test and plot widget in the mdi area
00168         self.splitter_ = QSplitter(Qt.Vertical, self._widget)
00169         self._widget.test_layout.addWidget(self.splitter_)
00170         self.splitter_.addWidget( self.test_widget_ )
00171         self.splitter_.addWidget( self.plot_widget_ )
00172         self.test_widget_.show()
00173         self.plot_widget_.show()
00174 
00175         self.nodes = None
00176         self.selected_node_ = None
00177         self.test_threads = []
00178 
00179         self.index_picture = 0
00180         self.list_of_pics = []
00181         self.list_of_pics_tests = []
00182 
00183         #add plot widget
00184         self.resizeable_plot = ResizeableQPlot(self.plot_widget_)
00185         self.resizeable_plot.setSizePolicy(QSizePolicy.Ignored, QSizePolicy.Ignored)
00186         self.resizeable_plot.setAlignment(Qt.AlignCenter)
00187         self.plot_widget_.plot_layout.addWidget(self.resizeable_plot)
00188 
00189         self._widget.btn_test.setEnabled(False)
00190         self._widget.btn_save.setEnabled(False)
00191         self.plot_widget_.btn_prev.setEnabled(False)
00192         self.plot_widget_.btn_next.setEnabled(False)
00193 
00194         self._widget.btn_refresh_nodes.pressed.connect(self.on_btn_refresh_nodes_clicked_)
00195         self._widget.btn_test.pressed.connect(self.on_btn_test_clicked_)
00196         self._widget.btn_save.pressed.connect(self.on_btn_save_clicked_)
00197 
00198         self.plot_widget_.btn_next.pressed.connect(self.on_btn_next_clicked_)
00199         self.plot_widget_.btn_prev.pressed.connect(self.on_btn_prev_clicked_)
00200 
00201         self._widget.nodes_combo.currentIndexChanged.connect(self.new_node_selected_)
00202 
00203         self.on_btn_refresh_nodes_clicked_()
00204 
00205 
00206     def on_btn_save_clicked_(self):
00207         """
00208         Save the tests in a tarball.
00209         """
00210         for test in self.test_threads:
00211             test.save()
00212 
00213         #backup previous test results if they exist
00214         path = "/tmp/self_tests.tar.gz"
00215         if os.path.isfile(path):
00216             shutil.copy(path, path+".bk")
00217             os.remove(path)
00218 
00219         #create the tarball and save everything in it.
00220         tarball = tarfile.open(path, "w:gz")
00221         tarball.add("/tmp/self_tests")
00222         tarball.close()
00223 
00224         QMessageBox.warning(self._widget, "Information", "A tarball was saved in "+path+", please email it to hand@shadowrobot.com.")
00225 
00226     def on_btn_test_clicked_(self):
00227         """
00228         Run the tests in separate threads (in parallel)
00229         """
00230         #empty thread list
00231         self.test_threads = []
00232 
00233         #disable btn, fold previous tests and reset progress bar
00234         self._widget.btn_test.setEnabled(False)
00235         self._widget.btn_save.setEnabled(False)
00236         root_item = self.test_widget_.test_tree.invisibleRootItem()
00237         for i in range( root_item.childCount() ):
00238             item = root_item.child(i)
00239             item.setExpanded(False)
00240         #also change cursor to "wait"
00241         self._widget.setCursor(Qt.WaitCursor)
00242 
00243         self._widget.progress_bar.reset()
00244 
00245         #delete previous results
00246         self.test_threads = []
00247 
00248         nodes_to_test = []
00249         if self.selected_node_ == "All":
00250             nodes_to_test = self.nodes[1:]
00251         else:
00252             nodes_to_test = [self.selected_node_]
00253 
00254         for n in nodes_to_test:
00255             self.test_threads.append(AsyncService(self._widget, n, len(self.test_threads)))
00256             self._widget.connect(self.test_threads[-1], SIGNAL("test_finished(QPoint)"), self.on_test_finished_)
00257             self._widget.connect(self.test_threads[-1], SIGNAL("manual_test(QPoint)"), self.on_manual_test_)
00258 
00259         for thread in self.test_threads:
00260             thread.start()
00261 
00262     def on_test_finished_(self, point):
00263         """
00264         Callback from test_finished signal. Display the results and update the progress
00265         """
00266         thread = self.test_threads[point.x()]
00267         resp = thread.resp
00268         node_item = None
00269         if resp.passed:
00270             node_item = QTreeWidgetItem(["OK", thread.node_name + " ["+str(resp.id)+"]"])
00271             node_item.setBackgroundColor(0, QColor(green))
00272         else:
00273             node_item = QTreeWidgetItem(["FAILED", thread.node_name + " ["+str(resp.id)+"]"])
00274             node_item.setBackgroundColor(0, QColor(red))
00275         self.test_widget_.test_tree.addTopLevelItem(node_item)
00276 
00277         #also display statuses
00278         for status in resp.status:
00279             display = ["", "", "", status.name, status.message]
00280             color = None
00281             if status.level == status.OK:
00282                 display[2] = "OK"
00283                 color = QColor(green)
00284             elif status.level == status.WARN:
00285                 display[2] = "WARN"
00286                 color = QColor(orange)
00287             else:
00288                 display[2] = "ERROR"
00289                 color = QColor(red)
00290             st_item = QTreeWidgetItem(node_item, display)
00291             st_item.setBackgroundColor(2, color)
00292             self.test_widget_.test_tree.addTopLevelItem(st_item)
00293             st_item.setExpanded(True)
00294         node_item.setExpanded(True)
00295 
00296         #display the plots if available
00297         self.display_plots_(thread.node_name)
00298 
00299         for col in range(0, self.test_widget_.test_tree.columnCount()):
00300             self.test_widget_.test_tree.resizeColumnToContents(col)
00301 
00302         #display progress advancement
00303         nb_threads_finished = 0
00304         for thread in self.test_threads:
00305             if thread.resp is not None:
00306                 nb_threads_finished += 1
00307                 thread.shutdown()
00308 
00309         percentage = 100.0 * nb_threads_finished / len(self.test_threads)
00310         self._widget.progress_bar.setValue( percentage )
00311 
00312         if percentage == 100.0:
00313             QMessageBox.information(self._widget, "Information", "If the tests passed, the communications\n with the hand are fine.")
00314 
00315             #all tests were run, reenable button
00316             self._widget.btn_test.setEnabled(True)
00317             #also change cursor to standard arrow
00318             self._widget.setCursor(Qt.ArrowCursor)
00319             self._widget.btn_save.setEnabled(True)
00320 
00321     def on_manual_test_(self, point):
00322         thread = self.test_threads[point.x()]
00323 
00324         input_dialog = QInputDialog(self._widget)
00325         input_dialog.setOkButtonText("OK - Test successful." )
00326         input_dialog.setCancelButtonText("NO - Test failed (please enter a comment to say why the test fail above).")
00327         input_dialog.setLabelText(thread.manual_test_req_.message)
00328         input_dialog.setWindowTitle( thread.node_name + " - Manual Test")
00329 
00330         ok = (QDialog.Accepted == input_dialog.exec_())
00331 
00332         thread.manual_test_res_ = ManualSelfTestResponse(ok, input_dialog.textValue())
00333 
00334     def display_plots_(self, display_node):
00335         """
00336         Loads the plots available in /tmp/self_tests/node (place where the sr_self_test saves
00337         the plots for the fingers movements)
00338         """
00339         self.list_of_pics = []
00340         self.list_of_pics_tests = []
00341         for root, dirs, files in os.walk("/tmp/self_tests/"):
00342             for f in files:
00343                 node_name = root.split("/")[-1]
00344                 if node_name == display_node:
00345                     if ".png" in f:
00346                         self.list_of_pics.append(os.path.join(root,f))
00347                         self.list_of_pics_tests.append(node_name)
00348 
00349         self.list_of_pics.sort()
00350         self.index_picture = 0
00351         self.refresh_pic_()
00352 
00353     def refresh_pic_(self):
00354         """
00355         Refresh the pic being displayed
00356         """
00357         if len(self.list_of_pics) > 0:
00358             self.plot_widget_.label_node.setText(self.list_of_pics_tests[self.index_picture] + " ["+str(self.index_picture+1)+"/"+str(len(self.list_of_pics))+"]")
00359 
00360             self.resizeable_plot.plot_pixmap = QPixmap(self.list_of_pics[self.index_picture])
00361             self.plot_widget_.btn_prev.setEnabled(True)
00362             self.plot_widget_.btn_next.setEnabled(True)
00363         else:
00364             self.plot_widget_.btn_prev.setEnabled(False)
00365             self.plot_widget_.btn_next.setEnabled(False)
00366 
00367         #dummy resize event to get the plot to be drawn
00368         self.resizeable_plot.resizeEvent(QResizeEvent(self.resizeable_plot.size(), self.resizeable_plot.size()))
00369 
00370     def on_btn_next_clicked_(self):
00371         """
00372         Next pic
00373         """
00374         self.index_picture = min(self.index_picture + 1, len(self.list_of_pics) - 1)
00375         self.refresh_pic_()
00376 
00377     def on_btn_prev_clicked_(self):
00378         """
00379         Prev pic
00380         """
00381         self.index_picture = max(self.index_picture - 1, 0)
00382         self.refresh_pic_()
00383 
00384     def on_btn_refresh_nodes_clicked_(self):
00385         """
00386         Refresh the list of nodes (check which node is advertising a self_test service)
00387         """
00388         self.nodes = []
00389 
00390         #gets all the list of services and only keep the nodes which have a self_test service
00391         self.nodes = [x[0].split('/')[len(x[0].split('/'))-2] for x in rosgraph.masterapi.Master("/").getSystemState()[2] if "self_test" in x[0]]
00392 
00393         #uniquify items in the list
00394         self.nodes = list(set(self.nodes))
00395 
00396         self.nodes.insert(0, "All")
00397 
00398         self._widget.nodes_combo.clear()
00399         for node in self.nodes:
00400             self._widget.nodes_combo.addItem(node)
00401 
00402     def new_node_selected_(self, index=None):
00403         """
00404         Callback for node selection dropdown
00405         """
00406         self.selected_node_ = self.nodes[index]
00407 
00408         if index != None:
00409             self._widget.btn_test.setEnabled(True)
00410         else:
00411             self._widget.btn_test.setEnabled(False)
00412 
00413     def _unregisterPublisher(self):
00414         if self._publisher is not None:
00415             self._publisher.unregister()
00416             self._publisher = None
00417 
00418     def shutdown_plugin(self):
00419         self._unregisterPublisher()
00420 
00421     def save_settings(self, global_settings, perspective_settings):
00422         pass
00423 
00424     def restore_settings(self, global_settings, perspective_settings):
00425         pass


sr_gui_self_test
Author(s): Ugo Cupcic
autogenerated on Fri Aug 28 2015 13:16:52