Go to the documentation of this file.00001
00002 import roslib
00003 roslib.load_manifest('cob_gazebo')
00004 roslib.load_manifest('cob_script_server')
00005
00006 import sys
00007 import time
00008 import unittest
00009
00010 import rospy
00011 import rostest
00012 from trajectory_msgs.msg import *
00013 from simple_script_server import *
00014 from pr2_controllers_msgs.msg import *
00015
00016 class UnitTest(unittest.TestCase):
00017 def __init__(self, *args):
00018 super(UnitTest, self).__init__(*args)
00019 rospy.init_node('component_test')
00020 self.message_received = False
00021 self.sss=simple_script_server()
00022
00023 def setUp(self):
00024 self.errors = []
00025
00026 def test_component(self):
00027
00028 try:
00029
00030 if not rospy.has_param('~component'):
00031 self.fail('Parameter component does not exist on ROS Parameter Server')
00032 component = rospy.get_param('~component')
00033
00034
00035 if not rospy.has_param('~target'):
00036 self.fail('Parameter target does not exist on ROS Parameter Server')
00037 target = rospy.get_param('~target')
00038
00039
00040 wait_time = rospy.get_param('~wait_time',5)
00041
00042
00043 if not rospy.has_param('~error_range'):
00044 self.fail('Parameter error_range does not exist on ROS Parameter Server')
00045 error_range = rospy.get_param('~error_range')
00046
00047 except KeyError, e:
00048 self.fail('Parameters not set properly')
00049
00050 print """
00051 Component: %s
00052 Target: %s
00053 Wait Time: %s
00054 Error Range: %s"""%(component, target, wait_time, error_range)
00055
00056
00057
00058 if error_range < 0.0:
00059 error_msg = "Parameter error_range should be positive, but is " + error_range
00060 self.fail(error_msg)
00061 if wait_time < 0.0:
00062 error_msg = "Parameter wait_time should be positive, but is " + wait_time
00063 self.fail(error_msg)
00064
00065
00066 command_topic = "/" + component + "_controller/command"
00067 state_topic = "/" + component + "_controller/state"
00068 sub_command_topic = rospy.Subscriber(command_topic, JointTrajectory, self.cb_command)
00069 sub_state_topic = rospy.Subscriber(state_topic, JointTrajectoryControllerState, self.cb_state)
00070
00071
00072 init_handle = self.sss.init(component)
00073 if init_handle.get_error_code() != 0:
00074 error_msg = 'Could not initialize ' + component
00075 self.fail(error_msg)
00076
00077
00078 print "Waiting for messages"
00079
00080 wallclock_timeout_t = time.time() + wait_time
00081 while not self.message_received and time.time() < wallclock_timeout_t:
00082
00083 time.sleep(0.1)
00084 if not self.message_received:
00085 self.fail('No state message received within wait_time')
00086
00087
00088 move_handle = self.sss.move(component,target)
00089
00090 if move_handle.get_error_code() != 0:
00091 error_msg = 'Could not move ' + component
00092 self.fail(error_msg + "; errorCode: " + str(move_handle.get_error_code()))
00093
00094
00095 traj_endpoint = self.command_traj.points[len(self.command_traj.points)-1]
00096
00097
00098 timeout_t = traj_endpoint.time_from_start.to_sec()*0.5
00099 rospy.sleep(timeout_t)
00100 print "Done waiting, validating results"
00101 actual_pos = self.actual_pos
00102
00103
00104 for i in range(len(traj_endpoint.positions)):
00105 self.assert_(((traj_endpoint.positions[i] - actual_pos[i]) < error_range), "Target position out of error_range")
00106
00107
00108 def cb_state(self, msg):
00109 self.message_received = True
00110 self.actual_pos = msg.actual.positions
00111 def cb_command(self, msg):
00112 self.command_traj = msg
00113
00114 if __name__ == '__main__':
00115 try:
00116 rostest.run('rostest', 'component_test', UnitTest, sys.argv)
00117 except KeyboardInterrupt, e:
00118 pass
00119 print "exiting"
00120