$search
00001 #!/usr/bin/env python 00002 import roslib 00003 roslib.load_manifest('cob_script_server') 00004 00005 import sys 00006 import unittest 00007 00008 import rospy 00009 import rostest 00010 from trajectory_msgs.msg import * 00011 from simple_script_server import * 00012 from pr2_controllers_msgs.msg import * 00013 from control_msgs.msg import * 00014 from cob_srvs.srv import * 00015 00016 class PythonAPITest(unittest.TestCase): 00017 def __init__(self, *args): 00018 super(PythonAPITest, self).__init__(*args) 00019 rospy.init_node('test_python_api_test') 00020 self.sss=simple_script_server() 00021 self.as_cb_executed = False 00022 self.ss_stop_cb_executed = False 00023 self.ns_global_prefix = "/script_server" 00024 00025 def test_python_api(self): 00026 # get parameters 00027 try: 00028 # component 00029 if not rospy.has_param('~command'): 00030 self.fail('Parameter command does not exist on ROS Parameter Server') 00031 command = rospy.get_param('~command') 00032 except KeyError, e: 00033 self.fail('Parameters not set properly') 00034 00035 # choose command to test 00036 if command == "move": 00037 component_name = "arm" 00038 # init action server (as) 00039 ##as_name = "/" + component_name + "_controller/joint_trajectory_action" 00040 ##self.as_server = actionlib.SimpleActionServer(as_name, JointTrajectoryAction, execute_cb=self.as_cb) 00041 as_name = "/" + component_name + "_controller/follow_joint_trajectory" 00042 self.as_server = actionlib.SimpleActionServer(as_name, FollowJointTrajectoryAction, execute_cb=self.as_cb) 00043 #execute test (as all components have the same trajectory interface, we only test for arm) 00044 self.move_test(component_name,"home") # test trajectories with a single point (home) 00045 self.move_test(component_name,"grasp-to-tray") # test trajectories with multiple points (grasp-to-tray) 00046 self.move_test(component_name,"wave") # test trajectories out of other points (wave) 00047 elif command == "stop": 00048 # prepare service server (ss) 00049 ss_name = "/arm_controller/" + command 00050 self.ss_cb_executed = False 00051 rospy.Service(ss_name, Trigger, self.ss_cb) 00052 # call sss function 00053 self.sss.stop("arm") 00054 # check result 00055 if not self.ss_cb_executed: 00056 self.fail('Service Server not called') 00057 elif command == "init": 00058 # prepare service server (ss) 00059 ss_name = "/arm_controller/" + command 00060 self.ss_cb_executed = False 00061 rospy.Service(ss_name, Trigger, self.ss_cb) 00062 # call sss function 00063 self.sss.init("arm") 00064 # check result 00065 if not self.ss_cb_executed: 00066 self.fail('Service Server not called') 00067 elif command == "recover": 00068 # prepare service server (ss) 00069 ss_name = "/arm_controller/" + command 00070 self.ss_cb_executed = False 00071 rospy.Service(ss_name, Trigger, self.ss_cb) 00072 # call sss function 00073 self.sss.recover("arm") 00074 # check result 00075 if not self.ss_cb_executed: 00076 self.fail('Service Server not called') 00077 else: 00078 self.fail("Command not known to test script") 00079 00080 def move_test(self,component_name, parameter_name): 00081 self.as_cb_executed = False 00082 00083 # call sss functions 00084 self.sss.move(component_name,parameter_name) 00085 00086 # check result 00087 if not self.as_cb_executed: 00088 self.fail('No goal received at action server') 00089 00090 # get joint_names from parameter server 00091 param_string = self.ns_global_prefix + "/" + component_name + "/joint_names" 00092 if not rospy.has_param(param_string): 00093 error_msg = "parameter " + param_string +" does not exist on ROS Parameter Server" 00094 self.fail(error_msg) 00095 joint_names = rospy.get_param(param_string) 00096 00097 # get joint values from parameter server 00098 if type(parameter_name) is str: 00099 param_string = self.ns_global_prefix + "/" + component_name + "/" + parameter_name 00100 if not rospy.has_param(param_string): 00101 error_msg = "parameter " + param_string + " does not exist on ROS Parameter Server" 00102 self.fail(error_msg) 00103 param = rospy.get_param(param_string) 00104 else: 00105 param = parameter_name 00106 00107 # check trajectory parameters 00108 if not type(param) is list: # check outer list 00109 error_msg = "no valid parameter for " + component_name + ": not a list, aborting..." 00110 self.fail(error_msg) 00111 00112 traj = [] 00113 00114 for point in param: 00115 #print point,"type1 = ", type(point) 00116 if type(point) is str: 00117 if not rospy.has_param(self.ns_global_prefix + "/" + component_name + "/" + point): 00118 error_msg = "parameter " + self.ns_global_prefix + "/" + component_name + "/" + point + " does not exist on ROS Parameter Server, aborting..." 00119 self.fail(error_msg) 00120 point = rospy.get_param(self.ns_global_prefix + "/" + component_name + "/" + point) 00121 point = point[0] # \todo hack because only first point is used, no support for trajectories inside trajectories 00122 #print point 00123 elif type(point) is list: 00124 rospy.logdebug("point is a list") 00125 else: 00126 error_msg = "no valid parameter for " + component_name + ": not a list of lists or strings, aborting..." 00127 self.fail(error_msg) 00128 00129 # here: point should be list of floats/ints 00130 #print point 00131 if not len(point) == len(joint_names): # check dimension 00132 error_msg = "no valid parameter for " + component_name + ": dimension should be " + len(joint_names) + " and is " + len(point) + ", aborting..." 00133 self.fail(error_msg) 00134 00135 for value in point: 00136 #print value,"type2 = ", type(value) 00137 if not ((type(value) is float) or (type(value) is int)): # check type 00138 #print type(value) 00139 error_msg = "no valid parameter for " + component_name + ": not a list of float or int, aborting..." 00140 self.fail(error_msg) 00141 traj.append(point) 00142 00143 traj_msg = JointTrajectory() 00144 traj_msg.header.stamp = rospy.Time.now()+rospy.Duration(0.5) 00145 traj_msg.joint_names = joint_names 00146 point_nr = 0 00147 for point in traj: 00148 point_msg = JointTrajectoryPoint() 00149 point_msg.positions = point 00150 point_msg.time_from_start=rospy.Duration(3*point_nr) # this value is set to 3 sec per point. \todo: read from parameter 00151 traj_msg.points.append(point_msg) 00152 00153 #print traj_msg 00154 #print self.traj 00155 00156 # check amount of trajectory points 00157 if not (len(traj_msg.points) == len(self.traj.points)): 00158 self.fail('Not the same amount of points in trajectory') 00159 00160 # check points 00161 #print traj_msg.points 00162 #print self.traj.points 00163 for i in range(len(traj_msg.points)): 00164 # check dimension of points 00165 #print traj_msg.points[i].positions 00166 #print self.traj.points[i].positions 00167 if not (len(traj_msg.points[i].positions) == len(self.traj.points[i].positions)): 00168 self.fail('Not the same amount of values in point') 00169 00170 # check values of points 00171 for j in range(len(traj_msg.points[i].positions)): 00172 if not (traj_msg.points[i].positions[j] == self.traj.points[i].positions[j]): 00173 self.fail('Not the same values in point') 00174 00175 def as_cb(self, goal): 00176 #result = JointTrajectoryResult() 00177 result = FollowJointTrajectoryResult() 00178 #self.as_server.set_preempted(result) 00179 self.as_cb_executed = True 00180 self.traj = goal.trajectory 00181 print "action server callback" 00182 self.as_server.set_succeeded(result) 00183 00184 def ss_cb(self,req): 00185 self.ss_cb_executed = True 00186 res = TriggerResponse() 00187 res.success.data = True 00188 return res 00189 00190 if __name__ == '__main__': 00191 try: 00192 rostest.run('rostest', 'test_python_apt_test', PythonAPITest, sys.argv) 00193 except KeyboardInterrupt, e: 00194 pass 00195 print "exiting" 00196