Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 PKG = 'pr2_plugs_gazebo_demo'
00040 NAME = 'plugin_test'
00041
00042 import roslib
00043 roslib.load_manifest(PKG)
00044
00045 import unittest, sys, os, math
00046 import time
00047 import rospy, rostest
00048 from smach import *
00049
00050 from nav_msgs.msg import *
00051 from std_msgs.msg import String
00052
00053
00054 from pr2_plugs_msgs.msg import *
00055 from pr2_plugs_msgs.srv import *
00056
00057 TEST_DURATION = 60000.0
00058
00059
00060
00061
00062 TARGET_X = -0.143443210214
00063 TARGET_Y = -0.790285012056 + 0.03
00064 TARGET_Z = 0.280302987184
00065 TARGET_RX = -0.00842100328719
00066 TARGET_RY = -0.0130500419972
00067 TARGET_RZ = 0.999875978203
00068 TARGET_RW = 0.00258114328752
00069
00070 TARGET_TOL = 0.01
00071
00072 MIN_HITS = 200.0
00073 MIN_RUNS = 20.0
00074
00075
00076
00077
00078 class TestPlug(unittest.TestCase):
00079 def __init__(self, *args):
00080 super(TestPlug, self).__init__(*args)
00081 self.success = False
00082 self.fail = False
00083 self.hits = 0
00084 self.runs = 0
00085 self.print_header = False
00086
00087 def positionInput(self, p3d):
00088 if not self.print_header:
00089 self.print_header = True
00090
00091
00092
00093
00094 self.runs = self.runs + 1
00095
00096 dx = p3d.pose.pose.position.x - TARGET_X
00097 dy = p3d.pose.pose.position.y - TARGET_Y
00098 dz = p3d.pose.pose.position.z - TARGET_Z
00099 d = math.sqrt((dx * dx) + (dy * dy))
00100
00101 print self.runs, self.hits, \
00102 p3d.pose.pose.position.x , p3d.pose.pose.position.y , p3d.pose.pose.position.z, \
00103 dx , dy , dz , d
00104
00105 if (math.fabs(d) < TARGET_TOL):
00106 self.hits = self.hits + 1
00107 if (self.runs > 10 and self.runs < 50):
00108 print "Got to goal too quickly! (",self.runs,")"
00109 self.success = False
00110 self.fail = True
00111
00112 if (self.hits > MIN_HITS):
00113 if (self.runs > MIN_RUNS):
00114 self.success = True
00115
00116 def stringOutput(self, str):
00117 print str.data
00118
00119
00120 def test_plug(self):
00121 rospy.init_node("init_plug_and_check_plug_pose")
00122 rospy.loginfo("plug test: waiting for recharge_request")
00123 rospy.wait_for_service('recharge_request')
00124 try:
00125 recharge_service = rospy.ServiceProxy('recharge_request',RechargeRequest)
00126 rr = RechargeCommand(RechargeCommand.PLUG_IN,'local')
00127 recharge_service(rr)
00128 except rospy.ServiceException, e:
00129 print "Calling recharge_requst service failes: %s"%e
00130
00131 rospy.loginfo("plug test: begin listening for plug pose for the unit test")
00132 rospy.Subscriber("/test_output", String, self.stringOutput)
00133 rospy.Subscriber("/plug/plug_pose_ground_truth", Odometry, self.positionInput)
00134
00135
00136 start = time.time()
00137 timeout_t = start + TEST_DURATION
00138 while not rospy.is_shutdown() and not self.success and not self.fail and time.time() < timeout_t:
00139 time.sleep(0.1)
00140 rospy.core.logerr("THIS IS THE TEST TIME:" + str(time.time() - start))
00141 self.assert_(self.success)
00142
00143
00144
00145
00146 if __name__ == '__main__':
00147 rostest.run(PKG, sys.argv[0], TestPlug, sys.argv)
00148