caster_runner.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 from __future__ import division
00004 PKG = 'qualification'
00005 import roslib; roslib.load_manifest(PKG)
00006 
00007 #import sys, os
00008 
00009 import rospy
00010 
00011 from std_msgs.msg import Float64
00012 
00013 from sensor_msgs.msg import JointState
00014 
00015 from pr2_self_test_msgs.srv import TestResultRequest, TestResult
00016 
00017 import numpy
00018 
00019 CONTROLLER_NAME = 'caster_motor'
00020 TOPIC_NAME = CONTROLLER_NAME + '/command'
00021 JOINT_NAME = 'caster_motor_joint'
00022 
00023 last_error = 10000000
00024 
00025 
00026 
00027 # Return absolute error of velocity command
00028 def get_error(msg, command):
00029     idx = -1
00030     for i, name in enumerate(msg.name):
00031         if name == JOINT_NAME:
00032             idx = i
00033             break
00034 
00035     global last_error
00036     last_error = abs(command - msg.velocity[idx])
00037 
00038 def calc_mse(errors):
00039     errors_ar = numpy.array(errors)
00040     return numpy.dot(errors_ar, errors_ar) / (len(errors))
00041 
00042 def pack_result(proxy, command, mse, tolerance):
00043     r = TestResultRequest()
00044 
00045     if mse < tolerance:
00046         r.result = TestResultRequest.RESULT_PASS
00047     else:
00048         r.result = TestResultRequest.RESULT_HUMAN_REQUIRED
00049     r.text_summary = 'Velocity command: %.2f. MSE: %.2f. Tolerance: %.2f' % (command, mse, tolerance)
00050     
00051     proxy.call(r)
00052 
00053 
00054 if __name__ == '__main__':
00055     rospy.init_node('caster_runner')
00056 
00057     cmd_pub = rospy.Publisher(TOPIC_NAME, Float64)
00058 
00059     # Pull parameters
00060     cmd = Float64(rospy.get_param('~command', 16.5))
00061     timeout = rospy.get_param('~time', 15.0)
00062     tolerance = rospy.get_param('~tolerance', 1.0)
00063 
00064     result_srv = rospy.ServiceProxy('test_result', TestResult)
00065 
00066 
00067     errors = [] # Store velocity tracking errors
00068 
00069     js_sub = rospy.Subscriber('joint_states', JointState, get_error, cmd.data)
00070                 
00071     my_rate = rospy.Rate(10.0)
00072     start_time = rospy.get_time()
00073     end_time = start_time + timeout
00074     while not rospy.is_shutdown():
00075         if rospy.get_time() > end_time:
00076             break
00077 
00078         cmd_pub.publish(cmd)
00079         
00080         # Ignore tracking errors for the first few seconds
00081         if rospy.get_time() - start_time < 5.0:
00082             continue
00083 
00084         errors.append(last_error)
00085         my_rate.sleep()
00086 
00087     pack_result(result_srv, cmd.data, calc_mse(errors), tolerance)
00088         
00089     rospy.spin()


qualification
Author(s): Kevin Watts (watts@willowgarage.com), Josh Faust (jfaust@willowgarage.com)
autogenerated on Sat Dec 28 2013 17:57:34