Go to the documentation of this file.00001
00002 import roslib
00003 roslib.load_manifest('cob_hardware_test')
00004
00005 import sys
00006 import time
00007 import unittest
00008 import math
00009
00010 import rospy
00011 import rostest
00012 from cob_hardware_test.srv import *
00013 from trajectory_msgs.msg import *
00014 from simple_script_server import *
00015 from pr2_controllers_msgs.msg import *
00016
00017
00018 def dialog_client(dialog_type, message):
00019
00020 rospy.wait_for_service('dialog')
00021 try:
00022 dialog = rospy.ServiceProxy('dialog', Dialog)
00023 resp1 = dialog(dialog_type, message)
00024 return resp1.answer
00025 except rospy.ServiceException, e:
00026 print "Service call failed: %s" % e
00027
00028 class UnitTest(unittest.TestCase):
00029 def __init__(self, *args):
00030 super(UnitTest, self).__init__(*args)
00031 rospy.init_node('component_test')
00032 self.message_received = False
00033 self.sss = simple_script_server()
00034 self.command_traj = JointTrajectory()
00035
00036 try:
00037
00038 if not rospy.has_param('~component'):
00039 self.fail('Parameter component does not exist on ROS Parameter Server')
00040 self.component = rospy.get_param('~component')
00041
00042 if not rospy.has_param('~test_target'):
00043 self.fail('Parameter test_target does not exist on ROS Parameter Server')
00044 self.test_target = rospy.get_param('~test_target')
00045
00046 if not rospy.has_param('~default_target'):
00047 self.fail('Parameter default_target does not exist on ROS Parameter Server')
00048 self.default_target = rospy.get_param('~default_target')
00049
00050 self.wait_time = rospy.get_param('~wait_time', 5)
00051
00052 if not rospy.has_param('~error_range'):
00053 self.fail('Parameter error_range does not exist on ROS Parameter Server')
00054 self.error_range = rospy.get_param('~error_range')
00055 except KeyError, e:
00056 self.fail('Parameters not set properly')
00057 print """
00058 Component: %s
00059 Targets: %s , %s
00060 Wait Time: %s
00061 Error Range: %s""" % (self.component, self.default_target, self.test_target, self.wait_time, self.error_range)
00062
00063
00064 if self.error_range < 0.0:
00065 error_msg = "Parameter error_range should be positive, but is " + self.error_range
00066 self.fail(error_msg)
00067 if self.wait_time < 0.0:
00068 error_msg = "Parameter wait_time should be positive, but is " + self.wait_time
00069 self.fail(error_msg)
00070
00071
00072 state_topic = "/" + self.component + "_controller/state"
00073 sub_state_topic = rospy.Subscriber(state_topic, JointTrajectoryControllerState, self.cb_state)
00074
00075 def test_component(self):
00076
00077
00078 init_handle = self.sss.init(self.component)
00079 if init_handle.get_error_code() != 0:
00080 error_msg = 'Could not initialize ' + self.component
00081 self.fail(error_msg)
00082
00083
00084 print "Waiting for messages"
00085
00086 abort_time = rospy.Time.now() + rospy.Duration(self.wait_time)
00087 while not self.message_received and rospy.get_rostime() < abort_time:
00088
00089 rospy.sleep(0.1)
00090
00091 if not self.message_received:
00092 self.fail('No state message received within wait_time(%s) from /%s_controller/state' % (self.wait_time, self.component))
00093
00094 self.assertTrue(dialog_client(0, 'Ready to move <<%s>> to \n <<%s>> and <<%s>>?' % (self.component, self.test_target, self.default_target)))
00095
00096
00097 self.execute_movement()
00098
00099 self.assertTrue(dialog_client(1, ' Please press emergency stop and release it again.\n\n Ready to move <<%s>> to \n <<%s>> and <<%s>>?' % (self.component, self.test_target, self.default_target)))
00100
00101 recover_handle = self.sss.recover(self.component)
00102 if recover_handle.get_error_code() != 0 :
00103 error_msg = 'Could not recover ' + self.component
00104 self.fail(error_msg)
00105
00106
00107 self.execute_movement()
00108
00109 def execute_movement(self):
00110
00111 move_handle = self.sss.move(self.component, self.test_target)
00112 self.assertEqual(move_handle.get_state(), 3)
00113 if move_handle.get_error_code() != 0:
00114 error_msg = 'Could not move ' + self.component
00115 self.fail(error_msg + "; errorCode: " + str(move_handle.get_error_code()))
00116 self.check_target_reached(self.test_target)
00117
00118
00119 move_handle = self.sss.move(self.component, self.default_target)
00120 self.assertEqual(move_handle.get_state(), 3)
00121 if move_handle.get_error_code() != 0:
00122 error_msg = 'Could not move ' + self.component
00123 self.fail(error_msg + "; errorCode: " + str(move_handle.get_error_code()))
00124 self.check_target_reached(self.default_target)
00125
00126 def check_target_reached(self,target):
00127
00128 command_traj = rospy.get_param("/script_server/" + self.component + "/" + target)
00129 print command_traj
00130
00131
00132 traj_endpoint = command_traj[len(command_traj) - 1]
00133 print traj_endpoint
00134
00135 actual_pos = self.actual_pos
00136
00137
00138 print "actual_pos = ", actual_pos
00139 print "traj_endpoint = ", traj_endpoint
00140 for i in range(len(traj_endpoint)):
00141 self.assert_(((math.fabs(traj_endpoint[i] - actual_pos[i])) < self.error_range), "Target position out of error_range")
00142
00143 self.assertTrue(dialog_client(1, 'Did <<%s>> reach <<%s>>?' % (self.component, target)))
00144
00145
00146 def cb_state(self, msg):
00147 self.actual_pos = msg.actual.positions
00148 self.message_received = True
00149
00150
00151 if __name__ == '__main__':
00152
00153
00154
00155 time.sleep(0.75)
00156 try:
00157 rostest.run('rostest', 'component_test', UnitTest, sys.argv)
00158 except KeyboardInterrupt, e:
00159 pass
00160 print "exiting"
00161