component_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import roslib
00003 roslib.load_manifest('cob_hardware_test')
00004 #from mpmath.functions.functions import fabs
00005 import sys
00006 import time
00007 import unittest
00008 import math
00009 
00010 import rospy
00011 import rostest
00012 from cob_hardware_test.srv import *
00013 from trajectory_msgs.msg import *
00014 from simple_script_server import *
00015 from pr2_controllers_msgs.msg import *
00016 
00017 
00018 def dialog_client(dialog_type, message):
00019         #dialog type: 0=confirm 1=question
00020         rospy.wait_for_service('dialog')
00021         try:
00022                 dialog = rospy.ServiceProxy('dialog', Dialog)
00023                 resp1 = dialog(dialog_type, message)
00024                 return resp1.answer
00025         except rospy.ServiceException, e:
00026                 print "Service call failed: %s" % e
00027 
00028 class UnitTest(unittest.TestCase):
00029         def __init__(self, *args):
00030                 super(UnitTest, self).__init__(*args)
00031                 rospy.init_node('component_test')
00032                 self.message_received = False
00033                 self.sss = simple_script_server()
00034                 self.command_traj = JointTrajectory()
00035                 # get parameters
00036                 try:
00037                         # component
00038                         if not rospy.has_param('~component'):
00039                                 self.fail('Parameter component does not exist on ROS Parameter Server')
00040                         self.component = rospy.get_param('~component')
00041                         # movement command
00042                         if not rospy.has_param('~test_target'):
00043                                 self.fail('Parameter test_target does not exist on ROS Parameter Server')
00044                         self.test_target = rospy.get_param('~test_target')
00045                         # movement command
00046                         if not rospy.has_param('~default_target'):
00047                                 self.fail('Parameter default_target does not exist on ROS Parameter Server')
00048                         self.default_target = rospy.get_param('~default_target')
00049                         # time to wait before
00050                         self.wait_time = rospy.get_param('~wait_time', 5)
00051                         # error range
00052                         if not rospy.has_param('~error_range'):
00053                                 self.fail('Parameter error_range does not exist on ROS Parameter Server')
00054                         self.error_range = rospy.get_param('~error_range')
00055                 except KeyError, e:
00056                         self.fail('Parameters not set properly')
00057                 print """
00058                         Component: %s
00059                         Targets: %s , %s
00060                         Wait Time: %s
00061                         Error Range: %s""" % (self.component, self.default_target, self.test_target, self.wait_time, self.error_range)
00062                 # check parameters
00063                 # \todo do more parameter tests
00064                 if self.error_range < 0.0:
00065                         error_msg = "Parameter error_range should be positive, but is " + self.error_range
00066                         self.fail(error_msg)
00067                 if self.wait_time < 0.0:
00068                         error_msg = "Parameter wait_time should be positive, but is " + self.wait_time
00069                         self.fail(error_msg)
00070 
00071                 # init subscribers
00072                 state_topic = "/" + self.component + "_controller/state"
00073                 sub_state_topic = rospy.Subscriber(state_topic, JointTrajectoryControllerState, self.cb_state)
00074 
00075         def test_component(self):
00076                 
00077                 # init component
00078                 init_handle = self.sss.init(self.component)
00079                 if init_handle.get_error_code() != 0:
00080                         error_msg = 'Could not initialize ' + self.component
00081                         self.fail(error_msg)
00082 
00083                 # start actual test
00084                 print "Waiting for messages"
00085                 #give the topics some seconds to receive messages
00086                 abort_time = rospy.Time.now() + rospy.Duration(self.wait_time)
00087                 while not self.message_received and rospy.get_rostime() < abort_time:
00088                         # print "###debug here###"
00089                         rospy.sleep(0.1)
00090 
00091                 if not self.message_received:
00092                         self.fail('No state message received within wait_time(%s) from /%s_controller/state' % (self.wait_time, self.component))
00093 
00094                 self.assertTrue(dialog_client(0, 'Ready to move <<%s>> to \n <<%s>> and <<%s>>?' % (self.component, self.test_target, self.default_target)))
00095 
00096                 # execute movement to test_target and back to default_target
00097                 self.execute_movement()
00098 
00099                 self.assertTrue(dialog_client(1, ' Please press emergency stop and release it again.\n\n Ready to move <<%s>> to \n <<%s>> and <<%s>>?' % (self.component, self.test_target, self.default_target)))
00100 
00101                 recover_handle = self.sss.recover(self.component)
00102                 if recover_handle.get_error_code() != 0 :
00103                         error_msg = 'Could not recover ' + self.component
00104                         self.fail(error_msg)
00105 
00106                 # execute movement to test_target and back to default_target
00107                 self.execute_movement()
00108 
00109         def execute_movement(self):
00110                 # test_target: send commands to component 
00111                 move_handle = self.sss.move(self.component, self.test_target)
00112                 self.assertEqual(move_handle.get_state(), 3) # state 3 equals errorcode 0 therefore the following will never be executed
00113                 if move_handle.get_error_code() != 0:
00114                         error_msg = 'Could not move ' + self.component
00115                         self.fail(error_msg + "; errorCode: " + str(move_handle.get_error_code()))
00116                 self.check_target_reached(self.test_target)
00117 
00118                 # default_target: send commands to component 
00119                 move_handle = self.sss.move(self.component, self.default_target)
00120                 self.assertEqual(move_handle.get_state(), 3) # state 3 equals errorcode 0 therefore the following will never be executed
00121                 if move_handle.get_error_code() != 0:
00122                         error_msg = 'Could not move ' + self.component
00123                         self.fail(error_msg + "; errorCode: " + str(move_handle.get_error_code()))
00124                 self.check_target_reached(self.default_target)
00125 
00126         def check_target_reached(self,target):
00127                 # get commanded trajectory
00128                 command_traj = rospy.get_param("/script_server/" + self.component + "/" + target)
00129                 print command_traj
00130 
00131                 # get last point out of trajectory
00132                 traj_endpoint = command_traj[len(command_traj) - 1]
00133                 print traj_endpoint
00134 
00135                 actual_pos = self.actual_pos # fix current position configuration for later evaluation
00136 
00137                 # checking if target position is really reached
00138                 print "actual_pos = ", actual_pos
00139                 print "traj_endpoint = ", traj_endpoint
00140                 for i in range(len(traj_endpoint)):
00141                         self.assert_(((math.fabs(traj_endpoint[i] - actual_pos[i])) < self.error_range), "Target position out of error_range")
00142 
00143                 self.assertTrue(dialog_client(1, 'Did <<%s>> reach <<%s>>?' % (self.component, target)))
00144 
00145         # callback functions
00146         def cb_state(self, msg):
00147                 self.actual_pos = msg.actual.positions
00148                 self.message_received = True
00149 
00150 
00151 if __name__ == '__main__':
00152         # Copied from hztest: A dirty hack to work around an apparent race condition at startup
00153         # that causes some hztests to fail. Most evident in the tests of
00154         # rosstage.
00155         time.sleep(0.75)
00156         try:
00157                 rostest.run('rostest', 'component_test', UnitTest, sys.argv)
00158         except KeyboardInterrupt, e:
00159                 pass
00160         print "exiting"
00161 


cob_hardware_test
Author(s): Florian Weisshardt
autogenerated on Mon Mar 17 2014 10:13:52