$search
00001 #!/usr/bin/env python 00002 import roslib 00003 roslib.load_manifest('cob_hardware_test') 00004 #from mpmath.functions.functions import fabs 00005 import sys 00006 import time 00007 import unittest 00008 import math 00009 00010 import rospy 00011 import rostest 00012 from cob_hardware_test.srv import * 00013 from trajectory_msgs.msg import * 00014 from simple_script_server import * 00015 from pr2_controllers_msgs.msg import * 00016 00017 00018 def dialog_client(dialog_type, message): 00019 #dialog type: 0=confirm 1=question 00020 rospy.wait_for_service('dialog') 00021 try: 00022 dialog = rospy.ServiceProxy('dialog', Dialog) 00023 resp1 = dialog(dialog_type, message) 00024 return resp1.answer 00025 except rospy.ServiceException, e: 00026 print "Service call failed: %s" % e 00027 00028 class UnitTest(unittest.TestCase): 00029 def __init__(self, *args): 00030 super(UnitTest, self).__init__(*args) 00031 rospy.init_node('component_test') 00032 self.message_received = False 00033 self.sss = simple_script_server() 00034 self.command_traj = JointTrajectory() 00035 # get parameters 00036 try: 00037 # component 00038 if not rospy.has_param('~component'): 00039 self.fail('Parameter component does not exist on ROS Parameter Server') 00040 self.component = rospy.get_param('~component') 00041 # movement command 00042 if not rospy.has_param('~test_target'): 00043 self.fail('Parameter test_target does not exist on ROS Parameter Server') 00044 self.test_target = rospy.get_param('~test_target') 00045 # movement command 00046 if not rospy.has_param('~default_target'): 00047 self.fail('Parameter default_target does not exist on ROS Parameter Server') 00048 self.default_target = rospy.get_param('~default_target') 00049 # time to wait before 00050 self.wait_time = rospy.get_param('~wait_time', 5) 00051 # error range 00052 if not rospy.has_param('~error_range'): 00053 self.fail('Parameter error_range does not exist on ROS Parameter Server') 00054 self.error_range = rospy.get_param('~error_range') 00055 except KeyError, e: 00056 self.fail('Parameters not set properly') 00057 print """ 00058 Component: %s 00059 Targets: %s , %s 00060 Wait Time: %s 00061 Error Range: %s""" % (self.component, self.default_target, self.test_target, self.wait_time, self.error_range) 00062 # check parameters 00063 # \todo do more parameter tests 00064 if self.error_range < 0.0: 00065 error_msg = "Parameter error_range should be positive, but is " + self.error_range 00066 self.fail(error_msg) 00067 if self.wait_time < 0.0: 00068 error_msg = "Parameter wait_time should be positive, but is " + self.wait_time 00069 self.fail(error_msg) 00070 00071 # init subscribers 00072 state_topic = "/" + self.component + "_controller/state" 00073 sub_state_topic = rospy.Subscriber(state_topic, JointTrajectoryControllerState, self.cb_state) 00074 00075 def test_component(self): 00076 00077 # init component 00078 init_handle = self.sss.init(self.component) 00079 if init_handle.get_error_code() != 0: 00080 error_msg = 'Could not initialize ' + self.component 00081 self.fail(error_msg) 00082 00083 # start actual test 00084 print "Waiting for messages" 00085 #give the topics some seconds to receive messages 00086 abort_time = rospy.Time.now() + rospy.Duration(self.wait_time) 00087 while not self.message_received and rospy.get_rostime() < abort_time: 00088 # print "###debug here###" 00089 rospy.sleep(0.1) 00090 00091 if not self.message_received: 00092 self.fail('No state message received within wait_time(%s) from /%s_controller/state' % (self.wait_time, self.component)) 00093 00094 self.assertTrue(dialog_client(0, 'Ready to move <<%s>> to \n <<%s>> and <<%s>>?' % (self.component, self.test_target, self.default_target))) 00095 00096 # execute movement to test_target and back to default_target 00097 self.execute_movement() 00098 00099 self.assertTrue(dialog_client(1, ' Please press emergency stop and release it again.\n\n Ready to move <<%s>> to \n <<%s>> and <<%s>>?' % (self.component, self.test_target, self.default_target))) 00100 00101 recover_handle = self.sss.recover(self.component) 00102 if recover_handle.get_error_code() != 0 : 00103 error_msg = 'Could not recover ' + self.component 00104 self.fail(error_msg) 00105 00106 # execute movement to test_target and back to default_target 00107 self.execute_movement() 00108 00109 def execute_movement(self): 00110 # test_target: send commands to component 00111 move_handle = self.sss.move(self.component, self.test_target) 00112 self.assertEqual(move_handle.get_state(), 3) # state 3 equals errorcode 0 therefore the following will never be executed 00113 if move_handle.get_error_code() != 0: 00114 error_msg = 'Could not move ' + self.component 00115 self.fail(error_msg + "; errorCode: " + str(move_handle.get_error_code())) 00116 self.check_target_reached(self.test_target) 00117 00118 # default_target: send commands to component 00119 move_handle = self.sss.move(self.component, self.default_target) 00120 self.assertEqual(move_handle.get_state(), 3) # state 3 equals errorcode 0 therefore the following will never be executed 00121 if move_handle.get_error_code() != 0: 00122 error_msg = 'Could not move ' + self.component 00123 self.fail(error_msg + "; errorCode: " + str(move_handle.get_error_code())) 00124 self.check_target_reached(self.default_target) 00125 00126 def check_target_reached(self,target): 00127 # get commanded trajectory 00128 command_traj = rospy.get_param("/script_server/" + self.component + "/" + target) 00129 print command_traj 00130 00131 # get last point out of trajectory 00132 traj_endpoint = command_traj[len(command_traj) - 1] 00133 print traj_endpoint 00134 00135 actual_pos = self.actual_pos # fix current position configuration for later evaluation 00136 00137 # checking if target position is really reached 00138 print "actual_pos = ", actual_pos 00139 print "traj_endpoint = ", traj_endpoint 00140 for i in range(len(traj_endpoint)): 00141 self.assert_(((math.fabs(traj_endpoint[i] - actual_pos[i])) < self.error_range), "Target position out of error_range") 00142 00143 self.assertTrue(dialog_client(1, 'Did <<%s>> reach <<%s>>?' % (self.component, target))) 00144 00145 # callback functions 00146 def cb_state(self, msg): 00147 self.actual_pos = msg.actual.positions 00148 self.message_received = True 00149 00150 00151 if __name__ == '__main__': 00152 # Copied from hztest: A dirty hack to work around an apparent race condition at startup 00153 # that causes some hztests to fail. Most evident in the tests of 00154 # rosstage. 00155 time.sleep(0.75) 00156 try: 00157 rostest.run('rostest', 'component_test', UnitTest, sys.argv) 00158 except KeyboardInterrupt, e: 00159 pass 00160 print "exiting" 00161