$search
00001 #!/usr/bin/env python 00002 import roslib 00003 roslib.load_manifest('cob_gazebo') 00004 roslib.load_manifest('cob_script_server') 00005 00006 import sys 00007 import time 00008 import unittest 00009 00010 import rospy 00011 import rostest 00012 from trajectory_msgs.msg import * 00013 from simple_script_server import * 00014 from pr2_controllers_msgs.msg import * 00015 00016 class UnitTest(unittest.TestCase): 00017 def __init__(self, *args): 00018 super(UnitTest, self).__init__(*args) 00019 rospy.init_node('component_test') 00020 self.message_received = False 00021 self.sss=simple_script_server() 00022 00023 def setUp(self): 00024 self.errors = [] 00025 00026 def test_component(self): 00027 # get parameters 00028 try: 00029 # component 00030 if not rospy.has_param('~component'): 00031 self.fail('Parameter component does not exist on ROS Parameter Server') 00032 component = rospy.get_param('~component') 00033 00034 # movement command 00035 if not rospy.has_param('~target'): 00036 self.fail('Parameter target does not exist on ROS Parameter Server') 00037 target = rospy.get_param('~target') 00038 00039 # time to wait before 00040 wait_time = rospy.get_param('~wait_time',5) 00041 00042 # error range 00043 if not rospy.has_param('~error_range'): 00044 self.fail('Parameter error_range does not exist on ROS Parameter Server') 00045 error_range = rospy.get_param('~error_range') 00046 00047 except KeyError, e: 00048 self.fail('Parameters not set properly') 00049 00050 print """ 00051 Component: %s 00052 Target: %s 00053 Wait Time: %s 00054 Error Range: %s"""%(component, target, wait_time, error_range) 00055 00056 # check parameters 00057 # \todo do more parameter tests 00058 if error_range < 0.0: 00059 error_msg = "Parameter error_range should be positive, but is " + error_range 00060 self.fail(error_msg) 00061 if wait_time < 0.0: 00062 error_msg = "Parameter wait_time should be positive, but is " + wait_time 00063 self.fail(error_msg) 00064 00065 # init subscribers 00066 command_topic = "/" + component + "_controller/command" 00067 state_topic = "/" + component + "_controller/state" 00068 sub_command_topic = rospy.Subscriber(command_topic, JointTrajectory, self.cb_command) 00069 sub_state_topic = rospy.Subscriber(state_topic, JointTrajectoryControllerState, self.cb_state) 00070 00071 # init component 00072 init_handle = self.sss.init(component) 00073 if init_handle.get_error_code() != 0: 00074 error_msg = 'Could not initialize ' + component 00075 self.fail(error_msg) 00076 00077 # start actual test 00078 print "Waiting for messages" 00079 # give the topics some seconds to receive messages 00080 wallclock_timeout_t = time.time() + wait_time 00081 while not self.message_received and time.time() < wallclock_timeout_t: 00082 #print "###debug here###" 00083 time.sleep(0.1) 00084 if not self.message_received: 00085 self.fail('No state message received within wait_time') 00086 00087 # send commands to component 00088 move_handle = self.sss.move(component,target) 00089 # move_handle = self.sss.move("arm","folded") 00090 if move_handle.get_error_code() != 0: 00091 error_msg = 'Could not move ' + component 00092 self.fail(error_msg + "; errorCode: " + str(move_handle.get_error_code())) 00093 00094 # get last point out of trajectory 00095 traj_endpoint = self.command_traj.points[len(self.command_traj.points)-1] 00096 00097 # Start evaluation 00098 timeout_t = traj_endpoint.time_from_start.to_sec()*0.5 # movement should already be finished, but let wait with an additional buffer of 50% times the desired time 00099 rospy.sleep(timeout_t) 00100 print "Done waiting, validating results" 00101 actual_pos = self.actual_pos # fix current position configuration for later evaluation 00102 00103 # checking if target position is realy reached 00104 for i in range(len(traj_endpoint.positions)): 00105 self.assert_(((traj_endpoint.positions[i] - actual_pos[i]) < error_range), "Target position out of error_range") 00106 00107 # callback functions 00108 def cb_state(self, msg): 00109 self.message_received = True 00110 self.actual_pos = msg.actual.positions 00111 def cb_command(self, msg): 00112 self.command_traj = msg 00113 00114 if __name__ == '__main__': 00115 try: 00116 rostest.run('rostest', 'component_test', UnitTest, sys.argv) 00117 except KeyboardInterrupt, e: 00118 pass 00119 print "exiting" 00120