00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 PKG = 'qualification'
00036 import roslib
00037 roslib.load_manifest(PKG)
00038
00039 import rospy
00040 from pr2_self_test_msgs.srv import TestResult, TestResultRequest
00041 from diagnostic_msgs.srv import SelfTest, SelfTestRequest
00042
00043 import sys
00044 from time import sleep
00045
00046 import traceback
00047
00048 from optparse import OptionParser
00049
00050 node_name = 'node'
00051 node_id = 'NONE'
00052 extramsg = ""
00053
00054 class EmptyReferenceID(Exception): pass
00055 class SelfTestFailed(Exception): pass
00056
00057 def add_reference(reference):
00058 from wg_invent_client import Invent
00059 username = rospy.get_param('/invent/username', '')
00060 password = rospy.get_param('/invent/password', '')
00061 serial = rospy.get_param('/qual_item/serial', None)
00062
00063 iv = Invent(username, password)
00064 if not iv.login() or serial == None:
00065 extramsg = '<p>Unable to login to invent to store item reference. Serial: %s. Reference: %s.</p>\n' % (serial, node_id)
00066 return False
00067
00068 return iv.addItemReference(serial, '', reference)
00069
00070
00071 def get_error_result(msg):
00072 rospy.logerr(msg)
00073
00074 r = TestResultRequest()
00075 r.html_result = '<p>%s</p>' % (msg)
00076 r.text_summary = msg
00077
00078 r.result = r.RESULT_FAIL
00079
00080 return r
00081
00082
00083 def write_selftest(node_name, add_ref, srv):
00084 pf_dict = { True: 'PASS', False: 'FAIL' }
00085
00086 r = TestResultRequest()
00087 if (result.passed):
00088 r.result = r.RESULT_PASS
00089 else:
00090 r.result = r.RESULT_FAIL
00091
00092 html = ["<p><b>Item ID: %s, using node name %s.</b></p>" % (srv.id, node_name)]
00093
00094 if add_ref:
00095 html.append('<p>Added item reference %s to inventory system under %s.</p>' % (srv.id, rospy.get_param('/qual_item/serial', '')))
00096
00097 statdict = {0: 'OK', 1: 'WARN', 2: 'ERROR'}
00098
00099 for i, stat in enumerate(srv.status):
00100 html.append("<p><b>Test %2d) %s</b>" % (i + 1, stat.name))
00101 html.append( '<br>Result %s: %s</p>' % (statdict[stat.level], stat.message))
00102
00103 if len(stat.values) > 0:
00104 html.append("<table border=\"1\" cellpadding=\"2\" cellspacing=\"0\">")
00105 html.append("<tr><td><b>Label</b></td><td><b>Value</b></td></tr>")
00106 for val in stat.values:
00107 html.append("<tr><td>%s</td><td>%s</td></tr>" % (val.key, val.value))
00108 html.append("</table>")
00109
00110 html.append("<hr size=\"2\">")
00111
00112 r.html_result = '\n'.join(html)
00113 r.text_summary = 'Node ID: %s, node name %s. Self test result: %s' % (srv.id, node_name, pf_dict[result.passed])
00114
00115 return r
00116
00117
00118
00119 def call_selftest(node_name):
00120 selftestname = node_name + '/self_test'
00121 rospy.logdebug('Testing %s' % selftestname)
00122
00123 test_service = rospy.ServiceProxy(selftestname, SelfTest)
00124
00125 try:
00126 rospy.wait_for_service(selftestname, 30)
00127 except:
00128 rospy.logerr('Wait for service %s timed out. Unable to process self test data.' % selftestname)
00129 return False, None
00130
00131 try:
00132 st_result = test_service.call(SelfTestRequest())
00133 except:
00134 rospy.logerr("Self test exited with an exception. It probably failed to run.")
00135 return False, None
00136
00137 rospy.logdebug('Received self test service.')
00138 return True, st_result
00139
00140
00141 if __name__ == '__main__':
00142 parser = OptionParser(usage="./%prog [--add_ref]", prog="run_selftest.py")
00143 parser.add_option("-a", "--add_ref", default=False, dest="add_ref", action="store_true",
00144 help="Add a reference in invent for the item's hardware ID")
00145 options, args = parser.parse_args(rospy.myargv())
00146
00147 rospy.init_node('selftest_caller')
00148
00149 result_service = rospy.ServiceProxy('test_result', TestResult)
00150
00151 node_name = rospy.resolve_name('node_name')
00152 if node_name == 'node_name':
00153 rospy.logwarn('\'node_name\' did not remap. Attempting to call service \'node_name/self_test\'. This is probably an error')
00154
00155
00156 ok, result = call_selftest(node_name)
00157
00158 try:
00159 rospy.wait_for_service('test_result', 5)
00160 except:
00161 rospy.logfatal('Unable to contact result service. Exiting')
00162 sys.exit(-1)
00163
00164 sent_results = False
00165
00166 if not ok:
00167 r = get_error_result('Selftest of %s didn\'t get a result. Unable to record results' % node_name)
00168 result_service.call(r)
00169 sent_results = True
00170 rospy.spin()
00171 sys.exit()
00172
00173 if not result.id or result.id == '':
00174 r = get_error_result('Selftest of %s returned with an empty reference ID.' % node_name)
00175 result_service.call(r)
00176 sent_results = True
00177 rospy.spin()
00178 sys.exit()
00179
00180
00181 if options.add_ref:
00182 if not add_reference(result.id):
00183 r = get_error_result('Failed to add reference id to inventory. Node: %s, ID: %s' % (node_name, result.id))
00184 result_service.call(r)
00185 sent_results = True
00186 rospy.spin()
00187 sys.exit()
00188
00189 if not sent_results:
00190 r = write_selftest(node_name, options.add_ref, result)
00191 result_service.call(r)
00192 rospy.spin()
00193
00194
00195 sys.exit(0)
00196
00197