Go to the documentation of this file.00001
00002 import roslib
00003 roslib.load_manifest('cob_script_server')
00004
00005 import sys
00006 import unittest
00007
00008 import rospy
00009 import rostest
00010 from trajectory_msgs.msg import *
00011 from simple_script_server import *
00012 from control_msgs.msg import *
00013 from cob_srvs.srv import *
00014
00015 class PythonAPITest(unittest.TestCase):
00016 def __init__(self, *args):
00017 super(PythonAPITest, self).__init__(*args)
00018 rospy.init_node('test_python_api_test')
00019 self.sss=simple_script_server()
00020 self.as_cb_executed = False
00021 self.ss_stop_cb_executed = False
00022 self.ns_global_prefix = "/script_server"
00023
00024 def test_python_api(self):
00025
00026 try:
00027
00028 if not rospy.has_param('~command'):
00029 self.fail('Parameter command does not exist on ROS Parameter Server')
00030 command = rospy.get_param('~command')
00031 except KeyError, e:
00032 self.fail('Parameters not set properly')
00033
00034
00035 if command == "move":
00036 component_name = "arm"
00037
00038
00039
00040 as_name = "/" + component_name + "_controller/follow_joint_trajectory"
00041 self.as_server = actionlib.SimpleActionServer(as_name, FollowJointTrajectoryAction, execute_cb=self.as_cb)
00042
00043 self.move_test(component_name,"home")
00044 self.move_test(component_name,"grasp-to-tray")
00045 self.move_test(component_name,"wave")
00046 elif command == "stop":
00047
00048 ss_name = "/arm_controller/" + command
00049 self.ss_cb_executed = False
00050 rospy.Service(ss_name, Trigger, self.ss_cb)
00051
00052 self.sss.stop("arm")
00053
00054 if not self.ss_cb_executed:
00055 self.fail('Service Server not called')
00056 elif command == "init":
00057
00058 ss_name = "/arm_controller/" + command
00059 self.ss_cb_executed = False
00060 rospy.Service(ss_name, Trigger, self.ss_cb)
00061
00062 self.sss.init("arm")
00063
00064 if not self.ss_cb_executed:
00065 self.fail('Service Server not called')
00066 elif command == "recover":
00067
00068 ss_name = "/arm_controller/" + command
00069 self.ss_cb_executed = False
00070 rospy.Service(ss_name, Trigger, self.ss_cb)
00071
00072 self.sss.recover("arm")
00073
00074 if not self.ss_cb_executed:
00075 self.fail('Service Server not called')
00076 else:
00077 self.fail("Command not known to test script")
00078
00079 def move_test(self,component_name, parameter_name):
00080 self.as_cb_executed = False
00081
00082
00083 self.sss.move(component_name,parameter_name)
00084
00085
00086 if not self.as_cb_executed:
00087 self.fail('No goal received at action server')
00088
00089
00090 param_string = self.ns_global_prefix + "/" + component_name + "/joint_names"
00091 if not rospy.has_param(param_string):
00092 error_msg = "parameter " + param_string +" does not exist on ROS Parameter Server"
00093 self.fail(error_msg)
00094 joint_names = rospy.get_param(param_string)
00095
00096
00097 if type(parameter_name) is str:
00098 param_string = self.ns_global_prefix + "/" + component_name + "/" + parameter_name
00099 if not rospy.has_param(param_string):
00100 error_msg = "parameter " + param_string + " does not exist on ROS Parameter Server"
00101 self.fail(error_msg)
00102 param = rospy.get_param(param_string)
00103 else:
00104 param = parameter_name
00105
00106
00107 if not type(param) is list:
00108 error_msg = "no valid parameter for " + component_name + ": not a list, aborting..."
00109 self.fail(error_msg)
00110
00111 traj = []
00112
00113 for point in param:
00114
00115 if type(point) is str:
00116 if not rospy.has_param(self.ns_global_prefix + "/" + component_name + "/" + point):
00117 error_msg = "parameter " + self.ns_global_prefix + "/" + component_name + "/" + point + " does not exist on ROS Parameter Server, aborting..."
00118 self.fail(error_msg)
00119 point = rospy.get_param(self.ns_global_prefix + "/" + component_name + "/" + point)
00120 point = point[0]
00121
00122 elif type(point) is list:
00123 rospy.logdebug("point is a list")
00124 else:
00125 error_msg = "no valid parameter for " + component_name + ": not a list of lists or strings, aborting..."
00126 self.fail(error_msg)
00127
00128
00129
00130 if not len(point) == len(joint_names):
00131 error_msg = "no valid parameter for " + component_name + ": dimension should be " + len(joint_names) + " and is " + len(point) + ", aborting..."
00132 self.fail(error_msg)
00133
00134 for value in point:
00135
00136 if not ((type(value) is float) or (type(value) is int)):
00137
00138 error_msg = "no valid parameter for " + component_name + ": not a list of float or int, aborting..."
00139 self.fail(error_msg)
00140 traj.append(point)
00141
00142 traj_msg = JointTrajectory()
00143 traj_msg.header.stamp = rospy.Time.now()+rospy.Duration(0.5)
00144 traj_msg.joint_names = joint_names
00145 point_nr = 0
00146 for point in traj:
00147 point_msg = JointTrajectoryPoint()
00148 point_msg.positions = point
00149 point_msg.time_from_start=rospy.Duration(3*point_nr)
00150 traj_msg.points.append(point_msg)
00151
00152
00153
00154
00155
00156 if not (len(traj_msg.points) == len(self.traj.points)):
00157 self.fail('Not the same amount of points in trajectory')
00158
00159
00160
00161
00162 for i in range(len(traj_msg.points)):
00163
00164
00165
00166 if not (len(traj_msg.points[i].positions) == len(self.traj.points[i].positions)):
00167 self.fail('Not the same amount of values in point')
00168
00169
00170 for j in range(len(traj_msg.points[i].positions)):
00171 if not (traj_msg.points[i].positions[j] == self.traj.points[i].positions[j]):
00172 self.fail('Not the same values in point')
00173
00174 def as_cb(self, goal):
00175
00176 result = FollowJointTrajectoryResult()
00177
00178 self.as_cb_executed = True
00179 self.traj = goal.trajectory
00180 print "action server callback"
00181 self.as_server.set_succeeded(result)
00182
00183 def ss_cb(self,req):
00184 self.ss_cb_executed = True
00185 res = TriggerResponse()
00186 res.success.data = True
00187 return res
00188
00189 if __name__ == '__main__':
00190 try:
00191 rostest.run('rostest', 'test_python_apt_test', PythonAPITest, sys.argv)
00192 except KeyboardInterrupt, e:
00193 pass
00194 print "exiting"
00195