drop_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2009, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of the Willow Garage nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 ##\author Kevin Watts
00035 ##\brief Monitors dropped packets for given motors, and prompts user when drop test done
00036 
00037 from __future__ import with_statement
00038 PKG='qualification'
00039 
00040 import roslib; roslib.load_manifest(PKG)
00041 
00042 import wx
00043 from wx import xrc
00044 from wx import html
00045 
00046 
00047 from pr2_self_test_msgs.srv import TestResult, TestResultRequest
00048 
00049 import rospy
00050 
00051 from diagnostic_msgs.msg import DiagnosticArray
00052 
00053 import threading
00054 
00055 from optparse import OptionParser
00056 
00057 import os
00058 import signal
00059 
00060 ##\todo Handle SIGINT so it'll die nicely
00061 
00062 def bool_to_msg(val):
00063     if val:
00064         return 'PASS'
00065     return 'FAIL'
00066 
00067 ##\brief Contains params, data for drops
00068 class Drop:
00069     ##\param name str : Name of drop (ex: 'Side Drop')
00070     ##\param file str : Instructions file of drop
00071     ##\param count int : Number of times to complete drop
00072     def __init__(self, name, file, count):
00073         self.count = count
00074         self.file = file
00075         self.name = name
00076 
00077         self.packets = []
00078         self.halted = []
00079         for i in range(0, self.count):
00080             self.packets.append('N/A')
00081             self.halted.append('N/A')
00082 
00083         self._current = 0
00084     
00085     ##\brief Stores data from each drop
00086     def update(self, packets, halted):
00087         self.packets[self._current] = bool_to_msg(packets)
00088         self.halted[self._current] = bool_to_msg(halted)
00089 
00090         self._current += 1
00091 
00092 ##\brief Displays instructions for user to run test, makes sure part is working
00093 class DropTestFrame(wx.Frame):
00094     def __init__(self, parent, test_name, pre, post, drops):
00095         wx.Frame.__init__(self, parent, wx.ID_ANY, 'Drop Test')
00096         
00097         self._mutex = threading.Lock()
00098 
00099         rospy.init_node('drop_tester')
00100 
00101         self._diag_sub = rospy.Subscriber('/diagnostics', DiagnosticArray, self._diag_cb)
00102         self.result_service = rospy.ServiceProxy('test_result', TestResult)
00103         
00104         self._msgs = []
00105         
00106         self._eth_master_ok = False
00107         self._drop_packets = -1
00108 
00109         xrc_path = os.path.join(roslib.packages.get_pkg_dir('qualification'), 'xrc/gui.xrc')
00110         xrc_res = xrc.XmlResource(xrc_path)
00111         self._panel = xrc_res.LoadPanel(self, 'instructions_panel')
00112         self._sizer = wx.BoxSizer(wx.VERTICAL)
00113         self._sizer.Add(self._panel)
00114         self.Layout()
00115 
00116         self._html_window = xrc.XRCCTRL(self._panel, 'html_window')
00117 
00118         self._cancel_button = xrc.XRCCTRL(self._panel, 'cancel_button')
00119         self._cancel_button.Bind(wx.EVT_BUTTON, self.on_cancel)
00120 
00121         self._continue_button = xrc.XRCCTRL(self._panel, 'continue_button')
00122         self._continue_button.Bind(wx.EVT_BUTTON, self.on_continue)
00123         self._continue_button.SetFocus()
00124         
00125         self.data_sent = False
00126 
00127         self.predrop = pre
00128         self.postdrop = post
00129         self.drops = drops
00130         self.test_name = test_name
00131 
00132         self.complete = 0
00133         self.total = 0
00134         for drop in self.drops:
00135             self.total += drop.count
00136 
00137         self._current_drop = 0
00138         self._current_drop_count = 0
00139         
00140         self._state = -1 # -1: PRE, 0: DROP, 1: POST
00141 
00142         self._canceled = False
00143 
00144         self.start_test()
00145         
00146     ##\brief Proceed to next drop, or pass if done
00147     def on_continue(self, event):
00148         if self._state == -1:
00149             self._state += 1
00150             self.display_drop_structs()
00151         elif self._state == 0:
00152             self.check_drop()
00153         else: # PASS
00154             r = TestResultRequest()
00155             r.result = TestResultRequest.RESULT_PASS
00156             r.plots = []
00157             r.html_result = self._write_result()
00158             r.text_summary = self._write_summary()
00159             self.send_results(r)
00160 
00161     ##\brief Cancel test, report failure
00162     def on_cancel(self, event):
00163         self._canceled = True
00164 
00165         wx.MessageBox('Canceling drop test. Press OK to terminate.', 'Canceling drop test', wx.OK|wx.ICON_ERROR, self)
00166         r = TestResultRequest()
00167         r.result = TestResultRequest.RESULT_FAIL
00168         r.plots = []
00169         r.html_result = self._write_result('User pressed cancel.')
00170         r.text_summary = self._write_summary('Canceled.')
00171         self.send_results(r)
00172  
00173     ##\brief Error, send failure data to manager
00174     def test_error_service_call(self, except_str = ''):
00175         rospy.logerr(except_str)
00176         r = TestResultRequest()
00177         r.html_result = except_str
00178         r.text_summary = 'Caught exception, automated test failure.'
00179         r.plots = []
00180         r.result = TestResultRequest.RESULT_FAIL
00181         self.send_results(r)
00182 
00183     ##\brief Send results to qualification manager
00184     def send_results(self, test_result):
00185         if not self.data_sent:
00186             rospy.wait_for_service('test_result', 10)
00187             self.result_service.call(test_result)
00188             self.data_sent = True
00189         self.Destroy()
00190 
00191     ##\brief Callback for diagnostics msgs
00192     def _diag_cb(self, msg):
00193         with self._mutex:
00194             # Record that we have data
00195             if self._drop_packets == -1:
00196                 self._eth_master_ok = True
00197                 self._drop_packets = 0
00198 
00199             for stat in msg.status:
00200                 self._msgs.append(stat)
00201 
00202         wx.CallAfter(self._check_msgs)
00203 
00204     ##\brief Checks diagnostics for EtherCAT Master, makes sure OK
00205     def _check_msgs(self):
00206         with self._mutex:
00207             dropped_pkts = 0
00208             late_pkts = 0
00209 
00210             for msg in self._msgs:
00211                 if msg.name == 'EtherCAT Master':
00212                     if msg.level != 0:
00213                         self._eth_master_ok = False
00214                     for kv in msg.values:
00215                         if kv.key == 'Dropped Packets':
00216                             if int(kv.value) != 0:
00217                                 dropped_pkts = int(kv.value)
00218                         if kv.key == 'RX Late Packet':
00219                             if int(kv.value) != 0:
00220                                 late_pkts = int(kv.value)
00221 
00222             # Subtract off late packets from drops, #4824
00223             self._drop_packets = dropped_pkts - late_pkts
00224             self._msgs = []
00225 
00226     ##\brief OK if no dropped packets and motors running
00227     def is_ok(self):
00228         return self._eth_master_ok and self._drop_packets == 0
00229 
00230     ##\brief Pass if OK and we've completed all drops w/o canceling
00231     def is_pass(self):
00232         return self.is_ok() and self._current_drop >= len(self.drops) and not self._canceled
00233     
00234     ##\brief Write data from each drop
00235     def _write_drop_data(self):
00236         table = '<table border="1" cellpadding="2" cellspacing="0">\n'
00237         table += '<tr><td><b>Drop</b></td><td><b>Count</b></td><td><b>Dropped Packets</b></td><td><b>Halted</b></td></tr>\n'
00238         for drop in self.drops:
00239             for i in range(0, drop.count):
00240                 table += '<tr><td>%s</td><td>%d</td><td>%s</td><td>%s</td></tr>\n' % (drop.name, i+1, drop.packets[i], drop.halted[i])
00241         table += '</table>\n'
00242         
00243         return table
00244     
00245     ##\brief Write data from entire drop test in HTML
00246     def _write_result(self, msg=''):
00247         data = '<p align=center><b>%s Data</b></p>\n' % (self.test_name)
00248         data += '<p>Result: %s. Completed: %d of %d</p>\n' % (bool_to_msg(self.is_pass()), self.complete, self.total)
00249         if len(msg) > 0:
00250             data += '<p>' + msg + '</p>\n'
00251 
00252         data += self._write_drop_data()
00253 
00254         return data
00255     
00256     ##\brief Write summary of drop test
00257     def _write_summary(self, msg=''):
00258         return '%s, result: %s. Completed %d/%d drops. %s' % (self.test_name, bool_to_msg(self.is_pass()), self.complete, self.total, msg)
00259                   
00260     ##\brief Load pre-drop instructions if any, or proceed
00261     def start_test(self):
00262         if self.predrop is not None:
00263             self._html_window.LoadFile(self.predrop)
00264         else:
00265             self._state += 1 # Start dropping
00266             self.display_drop_structs()
00267                                                           
00268     ##\brief Load post-drop instructions, or pass drop test
00269     def post_drop_check(self):
00270         if self.postdrop is not None:
00271             self._html_window.LoadFile(self.postdrop)
00272         else: # PASS
00273             r = TestResultRequest.RESULT_PASS
00274             r.plots = []
00275             r.html_result = self._write_result()
00276             r.text_summary = self._write_summary()
00277             self.send_results(r)
00278                       
00279     ##\brief Verify that drop was OK, move to next drop
00280     def check_drop(self):
00281         self.complete += 1
00282         self.drops[self._current_drop].update(self._drop_packets == 0, self._eth_master_ok)
00283 
00284         if not self.is_ok():
00285             r = TestResultRequest()
00286             r.result = TestResultRequest.RESULT_FAIL
00287             r.plots = []
00288             r.html_result = self._write_result()
00289             r.text_summary = self._write_summary()
00290 
00291             wx.MessageBox('Drop test failed. Press OK to finish and analyze.', 'Drop test failed', wx.OK|wx.ICON_ERROR, self)
00292             self.send_results(r)
00293         
00294         self._current_drop_count += 1   
00295         if self._current_drop_count >= self.drops[self._current_drop].count:
00296             self._current_drop += 1
00297             self._current_drop_count = 0
00298 
00299         if self._current_drop >= len(self.drops): # PASS
00300             self._state += 1 # Post test
00301             self.post_drop_check()
00302             return
00303 
00304         self.display_drop_structs()
00305     
00306     ##\brief Display instructions for drop in window. 
00307     def display_drop_structs(self):
00308         drop = self.drops[self._current_drop]
00309 
00310         structs = '<html><header>\n'
00311         structs += '<H2 align=center>%s</H2>\n' % self.test_name
00312         structs += '</header><hr size="3">\n<body>\n'
00313         structs += '<H3>%s. Count: %d of %d</H3>\n' % (drop.name, self._current_drop_count + 1, drop.count)
00314         structs += '<H4>Drop number %d of %d</H4>\n' % (self._current_drop + 1, len(self.drops))
00315         structs += '<hr size=2>\n'
00316 
00317         structs += open(drop.file).read()
00318         structs += '</body></html>'
00319                                          
00320         self._html_window.SetPage(structs)
00321 
00322     
00323 ##\brief App for displaying drop test GUI
00324 class DropTestApp(wx.App):
00325     def __init__(self, name, predrop, postdrop, drops):
00326         self.pre = predrop
00327         self.post = postdrop
00328         self.drops = drops
00329         self.name = name
00330         wx.App.__init__(self, clearSigInt = False)
00331 
00332 
00333     def OnInit(self):
00334         self._frame = DropTestFrame(None, self.name, self.pre, self.post, self.drops)
00335         self._frame.SetSize(wx.Size(550, 750))
00336         self._frame.Layout()
00337         self._frame.Show(True)
00338         
00339         return True
00340         
00341 
00342 
00343 if __name__ == '__main__':
00344     parser = OptionParser(usage='./%prog --pre PREDROP --post POSTDROP NAME1,DROP1,# NAME2,DROP2,# ...', prog="drop_test.py")
00345     parser.add_option("--pre", action="store", dest="predrop", metavar="PREDROP",
00346                       help="Predrop instructions file", default=None)
00347     parser.add_option("--post", action="store", dest="postdrop", metavar="POSTDROP",
00348                       help="Postdrop instructions file and checklist", default=None)
00349     parser.add_option("--name", action="store", dest="name", metavar="TESTNAME",
00350                       help="Name of drop test (ex: Gripper Drop Test)", default='Drop Test')
00351 
00352     options, args = parser.parse_args()
00353 
00354     
00355 
00356     drops = []
00357     for arg in args:
00358         if arg.find(',') < 0:
00359             continue
00360 
00361         name, file, count = arg.split(',')
00362         count = int(count)
00363         drops.append(Drop(name, file, count))
00364 
00365     if len(drops) == 0:
00366         parser.error('No drop instructions given. Unable to drop component')
00367         
00368     signal.signal(signal.SIGINT, signal.SIG_DFL)
00369 
00370     app = DropTestApp(options.name, options.predrop, options.postdrop, drops)
00371     try:
00372         app.MainLoop()
00373         rospy.spin()
00374     except KeyboardInterrupt:
00375         raise
00376     except:
00377         import traceback
00378         traceback.print_exc()
00379         r = TestResultRequest()
00380         r.text_summary = 'Caught exception'
00381         r.html_result = '<p>Caught exception, automated test failure. %s</p>\n' % traceback.format_exc()
00382         r.plots = []
00383         r.result = TestResultRequest.RESULT_FAIL
00384         rospy.wait_for_service('test_result', 10)
00385         self.result_service.call(test_result)
00386         self.data_sent = True
00387                                                               


qualification
Author(s): Kevin Watts (watts@willowgarage.com), Josh Faust (jfaust@willowgarage.com)
autogenerated on Sat Dec 28 2013 17:57:34