00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 from __future__ import with_statement
00038 PKG='qualification'
00039
00040 import roslib; roslib.load_manifest(PKG)
00041
00042 import wx
00043 from wx import xrc
00044 from wx import html
00045
00046
00047 from pr2_self_test_msgs.srv import TestResult, TestResultRequest
00048
00049 import rospy
00050
00051 from diagnostic_msgs.msg import DiagnosticArray
00052
00053 import threading
00054
00055 from optparse import OptionParser
00056
00057 import os
00058 import signal
00059
00060
00061
00062 def bool_to_msg(val):
00063 if val:
00064 return 'PASS'
00065 return 'FAIL'
00066
00067
00068 class Drop:
00069
00070
00071
00072 def __init__(self, name, file, count):
00073 self.count = count
00074 self.file = file
00075 self.name = name
00076
00077 self.packets = []
00078 self.halted = []
00079 for i in range(0, self.count):
00080 self.packets.append('N/A')
00081 self.halted.append('N/A')
00082
00083 self._current = 0
00084
00085
00086 def update(self, packets, halted):
00087 self.packets[self._current] = bool_to_msg(packets)
00088 self.halted[self._current] = bool_to_msg(halted)
00089
00090 self._current += 1
00091
00092
00093 class DropTestFrame(wx.Frame):
00094 def __init__(self, parent, test_name, pre, post, drops):
00095 wx.Frame.__init__(self, parent, wx.ID_ANY, 'Drop Test')
00096
00097 self._mutex = threading.Lock()
00098
00099 rospy.init_node('drop_tester')
00100
00101 self._diag_sub = rospy.Subscriber('/diagnostics', DiagnosticArray, self._diag_cb)
00102 self.result_service = rospy.ServiceProxy('test_result', TestResult)
00103
00104 self._msgs = []
00105
00106 self._eth_master_ok = False
00107 self._drop_packets = -1
00108
00109 xrc_path = os.path.join(roslib.packages.get_pkg_dir('qualification'), 'xrc/gui.xrc')
00110 xrc_res = xrc.XmlResource(xrc_path)
00111 self._panel = xrc_res.LoadPanel(self, 'instructions_panel')
00112 self._sizer = wx.BoxSizer(wx.VERTICAL)
00113 self._sizer.Add(self._panel)
00114 self.Layout()
00115
00116 self._html_window = xrc.XRCCTRL(self._panel, 'html_window')
00117
00118 self._cancel_button = xrc.XRCCTRL(self._panel, 'cancel_button')
00119 self._cancel_button.Bind(wx.EVT_BUTTON, self.on_cancel)
00120
00121 self._continue_button = xrc.XRCCTRL(self._panel, 'continue_button')
00122 self._continue_button.Bind(wx.EVT_BUTTON, self.on_continue)
00123 self._continue_button.SetFocus()
00124
00125 self.data_sent = False
00126
00127 self.predrop = pre
00128 self.postdrop = post
00129 self.drops = drops
00130 self.test_name = test_name
00131
00132 self.complete = 0
00133 self.total = 0
00134 for drop in self.drops:
00135 self.total += drop.count
00136
00137 self._current_drop = 0
00138 self._current_drop_count = 0
00139
00140 self._state = -1
00141
00142 self._canceled = False
00143
00144 self.start_test()
00145
00146
00147 def on_continue(self, event):
00148 if self._state == -1:
00149 self._state += 1
00150 self.display_drop_structs()
00151 elif self._state == 0:
00152 self.check_drop()
00153 else:
00154 r = TestResultRequest()
00155 r.result = TestResultRequest.RESULT_PASS
00156 r.plots = []
00157 r.html_result = self._write_result()
00158 r.text_summary = self._write_summary()
00159 self.send_results(r)
00160
00161
00162 def on_cancel(self, event):
00163 self._canceled = True
00164
00165 wx.MessageBox('Canceling drop test. Press OK to terminate.', 'Canceling drop test', wx.OK|wx.ICON_ERROR, self)
00166 r = TestResultRequest()
00167 r.result = TestResultRequest.RESULT_FAIL
00168 r.plots = []
00169 r.html_result = self._write_result('User pressed cancel.')
00170 r.text_summary = self._write_summary('Canceled.')
00171 self.send_results(r)
00172
00173
00174 def test_error_service_call(self, except_str = ''):
00175 rospy.logerr(except_str)
00176 r = TestResultRequest()
00177 r.html_result = except_str
00178 r.text_summary = 'Caught exception, automated test failure.'
00179 r.plots = []
00180 r.result = TestResultRequest.RESULT_FAIL
00181 self.send_results(r)
00182
00183
00184 def send_results(self, test_result):
00185 if not self.data_sent:
00186 rospy.wait_for_service('test_result', 10)
00187 self.result_service.call(test_result)
00188 self.data_sent = True
00189 self.Destroy()
00190
00191
00192 def _diag_cb(self, msg):
00193 with self._mutex:
00194
00195 if self._drop_packets == -1:
00196 self._eth_master_ok = True
00197 self._drop_packets = 0
00198
00199 for stat in msg.status:
00200 self._msgs.append(stat)
00201
00202 wx.CallAfter(self._check_msgs)
00203
00204
00205 def _check_msgs(self):
00206 with self._mutex:
00207 dropped_pkts = 0
00208 late_pkts = 0
00209
00210 for msg in self._msgs:
00211 if msg.name == 'EtherCAT Master':
00212 if msg.level != 0:
00213 self._eth_master_ok = False
00214 for kv in msg.values:
00215 if kv.key == 'Dropped Packets':
00216 if int(kv.value) != 0:
00217 dropped_pkts = int(kv.value)
00218 if kv.key == 'RX Late Packet':
00219 if int(kv.value) != 0:
00220 late_pkts = int(kv.value)
00221
00222
00223 self._drop_packets = dropped_pkts - late_pkts
00224 self._msgs = []
00225
00226
00227 def is_ok(self):
00228 return self._eth_master_ok and self._drop_packets == 0
00229
00230
00231 def is_pass(self):
00232 return self.is_ok() and self._current_drop >= len(self.drops) and not self._canceled
00233
00234
00235 def _write_drop_data(self):
00236 table = '<table border="1" cellpadding="2" cellspacing="0">\n'
00237 table += '<tr><td><b>Drop</b></td><td><b>Count</b></td><td><b>Dropped Packets</b></td><td><b>Halted</b></td></tr>\n'
00238 for drop in self.drops:
00239 for i in range(0, drop.count):
00240 table += '<tr><td>%s</td><td>%d</td><td>%s</td><td>%s</td></tr>\n' % (drop.name, i+1, drop.packets[i], drop.halted[i])
00241 table += '</table>\n'
00242
00243 return table
00244
00245
00246 def _write_result(self, msg=''):
00247 data = '<p align=center><b>%s Data</b></p>\n' % (self.test_name)
00248 data += '<p>Result: %s. Completed: %d of %d</p>\n' % (bool_to_msg(self.is_pass()), self.complete, self.total)
00249 if len(msg) > 0:
00250 data += '<p>' + msg + '</p>\n'
00251
00252 data += self._write_drop_data()
00253
00254 return data
00255
00256
00257 def _write_summary(self, msg=''):
00258 return '%s, result: %s. Completed %d/%d drops. %s' % (self.test_name, bool_to_msg(self.is_pass()), self.complete, self.total, msg)
00259
00260
00261 def start_test(self):
00262 if self.predrop is not None:
00263 self._html_window.LoadFile(self.predrop)
00264 else:
00265 self._state += 1
00266 self.display_drop_structs()
00267
00268
00269 def post_drop_check(self):
00270 if self.postdrop is not None:
00271 self._html_window.LoadFile(self.postdrop)
00272 else:
00273 r = TestResultRequest.RESULT_PASS
00274 r.plots = []
00275 r.html_result = self._write_result()
00276 r.text_summary = self._write_summary()
00277 self.send_results(r)
00278
00279
00280 def check_drop(self):
00281 self.complete += 1
00282 self.drops[self._current_drop].update(self._drop_packets == 0, self._eth_master_ok)
00283
00284 if not self.is_ok():
00285 r = TestResultRequest()
00286 r.result = TestResultRequest.RESULT_FAIL
00287 r.plots = []
00288 r.html_result = self._write_result()
00289 r.text_summary = self._write_summary()
00290
00291 wx.MessageBox('Drop test failed. Press OK to finish and analyze.', 'Drop test failed', wx.OK|wx.ICON_ERROR, self)
00292 self.send_results(r)
00293
00294 self._current_drop_count += 1
00295 if self._current_drop_count >= self.drops[self._current_drop].count:
00296 self._current_drop += 1
00297 self._current_drop_count = 0
00298
00299 if self._current_drop >= len(self.drops):
00300 self._state += 1
00301 self.post_drop_check()
00302 return
00303
00304 self.display_drop_structs()
00305
00306
00307 def display_drop_structs(self):
00308 drop = self.drops[self._current_drop]
00309
00310 structs = '<html><header>\n'
00311 structs += '<H2 align=center>%s</H2>\n' % self.test_name
00312 structs += '</header><hr size="3">\n<body>\n'
00313 structs += '<H3>%s. Count: %d of %d</H3>\n' % (drop.name, self._current_drop_count + 1, drop.count)
00314 structs += '<H4>Drop number %d of %d</H4>\n' % (self._current_drop + 1, len(self.drops))
00315 structs += '<hr size=2>\n'
00316
00317 structs += open(drop.file).read()
00318 structs += '</body></html>'
00319
00320 self._html_window.SetPage(structs)
00321
00322
00323
00324 class DropTestApp(wx.App):
00325 def __init__(self, name, predrop, postdrop, drops):
00326 self.pre = predrop
00327 self.post = postdrop
00328 self.drops = drops
00329 self.name = name
00330 wx.App.__init__(self, clearSigInt = False)
00331
00332
00333 def OnInit(self):
00334 self._frame = DropTestFrame(None, self.name, self.pre, self.post, self.drops)
00335 self._frame.SetSize(wx.Size(550, 750))
00336 self._frame.Layout()
00337 self._frame.Show(True)
00338
00339 return True
00340
00341
00342
00343 if __name__ == '__main__':
00344 parser = OptionParser(usage='./%prog --pre PREDROP --post POSTDROP NAME1,DROP1,# NAME2,DROP2,# ...', prog="drop_test.py")
00345 parser.add_option("--pre", action="store", dest="predrop", metavar="PREDROP",
00346 help="Predrop instructions file", default=None)
00347 parser.add_option("--post", action="store", dest="postdrop", metavar="POSTDROP",
00348 help="Postdrop instructions file and checklist", default=None)
00349 parser.add_option("--name", action="store", dest="name", metavar="TESTNAME",
00350 help="Name of drop test (ex: Gripper Drop Test)", default='Drop Test')
00351
00352 options, args = parser.parse_args()
00353
00354
00355
00356 drops = []
00357 for arg in args:
00358 if arg.find(',') < 0:
00359 continue
00360
00361 name, file, count = arg.split(',')
00362 count = int(count)
00363 drops.append(Drop(name, file, count))
00364
00365 if len(drops) == 0:
00366 parser.error('No drop instructions given. Unable to drop component')
00367
00368 signal.signal(signal.SIGINT, signal.SIG_DFL)
00369
00370 app = DropTestApp(options.name, options.predrop, options.postdrop, drops)
00371 try:
00372 app.MainLoop()
00373 rospy.spin()
00374 except KeyboardInterrupt:
00375 raise
00376 except:
00377 import traceback
00378 traceback.print_exc()
00379 r = TestResultRequest()
00380 r.text_summary = 'Caught exception'
00381 r.html_result = '<p>Caught exception, automated test failure. %s</p>\n' % traceback.format_exc()
00382 r.plots = []
00383 r.result = TestResultRequest.RESULT_FAIL
00384 rospy.wait_for_service('test_result', 10)
00385 self.result_service.call(test_result)
00386 self.data_sent = True
00387