00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 import roslib
00036 roslib.load_manifest('qualification')
00037
00038 import sys
00039 import rospy
00040 from pr2_self_test_msgs.srv import *
00041 import subprocess
00042 import select
00043 import fcntl
00044 import os
00045
00046 def make_nonblock(file):
00047 fd = file.fileno()
00048 fl = fcntl.fcntl(fd, fcntl.F_GETFL)
00049 fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
00050
00051 if __name__ == '__main__':
00052 rospy.init_node("test_caller", anonymous=True)
00053 args = rospy.myargv()
00054
00055 r = TestResultRequest()
00056 r.plots = []
00057
00058 try:
00059 if len(args) < 3:
00060 raise Exception('Usage: ./test_caller.py <program> <args>')
00061
00062 if not rospy.is_shutdown():
00063 rospy.sleep(rospy.Duration(rospy.get_param("~pre_delay", 0)))
00064
00065 popen_args = args[1:]
00066
00067 p = subprocess.Popen(popen_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
00068
00069 fdmap = {p.stdout:sys.stdout, p.stderr:sys.stderr}
00070 output = ""
00071 for fd in fdmap.iterkeys():
00072 make_nonblock(fd)
00073 while fdmap and not rospy.is_shutdown():
00074 fdl = select.select(fdmap.keys(), [], [])
00075 for fd in fdl[0]:
00076 read = fd.read()
00077 if read:
00078 fdmap[fd].write(read)
00079 else:
00080 del fdmap[fd]
00081 output += read
00082 except Exception, e:
00083 r.text_summary = "Failed with exception in test_caller.py"
00084 r.result = TestResultRequest.RESULT_FAIL
00085 r.html_result = "<p>Test Failed with exception.</p><p>%s</p>"%str(e)
00086 output = str(e)
00087 else:
00088 p.wait()
00089 retcode = p.returncode
00090
00091 rospy.sleep(rospy.Duration(rospy.get_param("~post_delay", 0)))
00092
00093 output = output.replace('\n','<br>')
00094
00095 if not p.returncode:
00096 r.text_summary = "Successful completion of %s"%args[0]
00097 r.html_result = "<p>Test passed.</p><p>"+output+"</p>"
00098 r.result = TestResultRequest.RESULT_PASS
00099 print "pass"
00100 else:
00101 r.text_summary = "Completion of %s with error code %i."%(args[0], retcode)
00102 r.result = TestResultRequest.RESULT_FAIL
00103 r.html_result = "<p>Test Failed.</p><p>"+output+"</p>"
00104 print "fail"
00105 finally:
00106 try:
00107 p.kill()
00108 except OSError, e:
00109 if e.errno != 3:
00110 raise
00111
00112 result_service = rospy.ServiceProxy('test_result', TestResult)
00113 rospy.wait_for_service('test_result')
00114 result_service.call(r)