component_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import roslib
00003 roslib.load_manifest('cob_gazebo')
00004 roslib.load_manifest('cob_script_server')
00005 
00006 import sys
00007 import time
00008 import unittest
00009 
00010 import rospy
00011 import rostest
00012 from trajectory_msgs.msg import *
00013 from simple_script_server import *
00014 from control_msgs.msg import *
00015 
00016 class UnitTest(unittest.TestCase):
00017     def __init__(self, *args):
00018         super(UnitTest, self).__init__(*args)
00019         rospy.init_node('component_test')
00020         self.message_received = False
00021         self.sss=simple_script_server()
00022         
00023     def setUp(self):
00024         self.errors = []
00025 
00026     def test_component(self):
00027         # get parameters
00028         try:
00029             # component
00030             if not rospy.has_param('~component'):
00031                 self.fail('Parameter component does not exist on ROS Parameter Server')
00032             component = rospy.get_param('~component')
00033 
00034             # movement command
00035             if not rospy.has_param('~target'):
00036                 self.fail('Parameter target does not exist on ROS Parameter Server')
00037             target = rospy.get_param('~target')
00038 
00039             # time to wait before
00040             wait_time = rospy.get_param('~wait_time',5)
00041 
00042             # error range
00043             if not rospy.has_param('~error_range'):
00044                 self.fail('Parameter error_range does not exist on ROS Parameter Server')
00045             error_range = rospy.get_param('~error_range')
00046 
00047         except KeyError, e:
00048             self.fail('Parameters not set properly')
00049 
00050         print """
00051               Component: %s  
00052               Target: %s
00053               Wait Time: %s
00054               Error Range: %s"""%(component, target, wait_time, error_range)
00055         
00056         # check parameters
00057         # \todo do more parameter tests
00058         if error_range < 0.0:
00059             error_msg = "Parameter error_range should be positive, but is " + error_range
00060             self.fail(error_msg)
00061         if wait_time < 0.0:
00062             error_msg = "Parameter wait_time should be positive, but is " + wait_time
00063             self.fail(error_msg)
00064         
00065         # init subscribers
00066         command_topic = "/" + component + "_controller/command"
00067         state_topic = "/" + component + "_controller/state"
00068         sub_command_topic = rospy.Subscriber(command_topic, JointTrajectory, self.cb_command) 
00069         sub_state_topic = rospy.Subscriber(state_topic, JointTrajectoryControllerState, self.cb_state) 
00070         
00071         # init component
00072         init_handle = self.sss.init(component)
00073         if init_handle.get_error_code() != 0:
00074             error_msg = 'Could not initialize ' + component
00075             self.fail(error_msg)
00076         
00077         # start actual test
00078         print "Waiting for messages"
00079         # give the topics some seconds to receive messages
00080         wallclock_timeout_t = time.time() + wait_time
00081         while not self.message_received and time.time() < wallclock_timeout_t:
00082             #print "###debug here###" 
00083             time.sleep(0.1)
00084         if not self.message_received:
00085             self.fail('No state message received within wait_time')
00086           
00087         # send commands to component
00088         move_handle = self.sss.move(component,target)
00089         # move_handle = self.sss.move("arm","folded")
00090         if move_handle.get_error_code() != 0:
00091             error_msg = 'Could not move ' + component
00092             self.fail(error_msg + "; errorCode: " + str(move_handle.get_error_code()))
00093         
00094         # get last point out of trajectory
00095         traj_endpoint = self.command_traj.points[len(self.command_traj.points)-1]
00096              
00097         # Start evaluation
00098         timeout_t = traj_endpoint.time_from_start.to_sec()*0.5 # movement should already be finished, but let wait with an additional buffer of 50% times the desired time
00099         rospy.sleep(timeout_t)
00100         print "Done waiting, validating results"
00101         actual_pos = self.actual_pos # fix current position configuration for later evaluation
00102         
00103         # checking if target position is realy reached
00104         for i in range(len(traj_endpoint.positions)):
00105             self.assert_(((traj_endpoint.positions[i] - actual_pos[i]) < error_range), "Target position out of error_range")
00106             
00107     # callback functions
00108     def cb_state(self, msg):
00109         self.message_received = True
00110         self.actual_pos = msg.actual.positions
00111     def cb_command(self, msg):
00112         self.command_traj = msg
00113          
00114 if __name__ == '__main__':
00115     try:
00116         rostest.run('rostest', 'component_test', UnitTest, sys.argv)
00117     except KeyboardInterrupt, e:
00118         pass
00119     print "exiting"
00120 


cob_gazebo
Author(s): Florian Weisshardt
autogenerated on Thu Aug 27 2015 12:48:18