Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 import sys
00019 import time
00020 import unittest
00021 
00022 import rospy
00023 import rostest
00024 from trajectory_msgs.msg import *
00025 from simple_script_server import *
00026 from control_msgs.msg import *
00027 
00028 class UnitTest(unittest.TestCase):
00029     def __init__(self, *args):
00030         super(UnitTest, self).__init__(*args)
00031         rospy.init_node('component_test')
00032         self.message_received = False
00033         self.sss=simple_script_server()
00034 
00035     def setUp(self):
00036         self.errors = []
00037 
00038     def test_component(self):
00039         
00040         try:
00041             
00042             if not rospy.has_param('~component'):
00043                 self.fail('Parameter component does not exist on ROS Parameter Server')
00044             component = rospy.get_param('~component')
00045 
00046             
00047             if not rospy.has_param('~target'):
00048                 self.fail('Parameter target does not exist on ROS Parameter Server')
00049             target = rospy.get_param('~target')
00050 
00051             
00052             wait_time = rospy.get_param('~wait_time',5)
00053 
00054             
00055             if not rospy.has_param('~error_range'):
00056                 self.fail('Parameter error_range does not exist on ROS Parameter Server')
00057             error_range = rospy.get_param('~error_range')
00058 
00059         except KeyError, e:
00060             self.fail('Parameters not set properly')
00061 
00062         print """
00063               Component: %s
00064               Target: %s
00065               Wait Time: %s
00066               Error Range: %s"""%(component, target, wait_time, error_range)
00067 
00068         
00069         
00070         if error_range < 0.0:
00071             error_msg = "Parameter error_range should be positive, but is " + error_range
00072             self.fail(error_msg)
00073         if wait_time < 0.0:
00074             error_msg = "Parameter wait_time should be positive, but is " + wait_time
00075             self.fail(error_msg)
00076 
00077         
00078         command_topic = "/" + component + "_controller/command"
00079         state_topic = "/" + component + "_controller/state"
00080         sub_command_topic = rospy.Subscriber(command_topic, JointTrajectory, self.cb_command)
00081         sub_state_topic = rospy.Subscriber(state_topic, JointTrajectoryControllerState, self.cb_state)
00082 
00083         
00084         init_handle = self.sss.init(component)
00085         if init_handle.get_error_code() != 0:
00086             error_msg = 'Could not initialize ' + component
00087             self.fail(error_msg)
00088 
00089         
00090         print "Waiting for messages"
00091         
00092         wallclock_timeout_t = time.time() + wait_time
00093         while not self.message_received and time.time() < wallclock_timeout_t:
00094             
00095             time.sleep(0.1)
00096         if not self.message_received:
00097             self.fail('No state message received within wait_time')
00098 
00099         
00100         move_handle = self.sss.move(component,target)
00101         
00102         if move_handle.get_error_code() != 0:
00103             error_msg = 'Could not move ' + component
00104             self.fail(error_msg + "; errorCode: " + str(move_handle.get_error_code()))
00105 
00106         
00107         traj_endpoint = self.command_traj.points[len(self.command_traj.points)-1]
00108 
00109         
00110         timeout_t = traj_endpoint.time_from_start.to_sec()*0.5 
00111         rospy.sleep(timeout_t)
00112         print "Done waiting, validating results"
00113         actual_pos = self.actual_pos 
00114 
00115         
00116         for i in range(len(traj_endpoint.positions)):
00117             self.assert_(((traj_endpoint.positions[i] - actual_pos[i]) < error_range), "Target position out of error_range")
00118 
00119     
00120     def cb_state(self, msg):
00121         self.message_received = True
00122         self.actual_pos = msg.actual.positions
00123     def cb_command(self, msg):
00124         self.command_traj = msg
00125 
00126 if __name__ == '__main__':
00127     try:
00128         rostest.run('rostest', 'component_test', UnitTest, sys.argv)
00129     except KeyboardInterrupt, e:
00130         pass
00131     print "exiting"
00132