Go to the documentation of this file.00001
00002 import roslib
00003 roslib.load_manifest('cob_script_server')
00004
00005 import sys
00006 import unittest
00007
00008 import rospy
00009 import rostest
00010 from trajectory_msgs.msg import *
00011 from simple_script_server import *
00012 from pr2_controllers_msgs.msg import *
00013 from control_msgs.msg import *
00014 from cob_srvs.srv import *
00015
00016 class PythonAPITest(unittest.TestCase):
00017 def __init__(self, *args):
00018 super(PythonAPITest, self).__init__(*args)
00019 rospy.init_node('test_python_api_test')
00020 self.sss=simple_script_server()
00021 self.as_cb_executed = False
00022 self.ss_stop_cb_executed = False
00023 self.ns_global_prefix = "/script_server"
00024
00025 def test_python_api(self):
00026
00027 try:
00028
00029 if not rospy.has_param('~command'):
00030 self.fail('Parameter command does not exist on ROS Parameter Server')
00031 command = rospy.get_param('~command')
00032 except KeyError, e:
00033 self.fail('Parameters not set properly')
00034
00035
00036 if command == "move":
00037 component_name = "arm"
00038
00039
00040
00041 as_name = "/" + component_name + "_controller/follow_joint_trajectory"
00042 self.as_server = actionlib.SimpleActionServer(as_name, FollowJointTrajectoryAction, execute_cb=self.as_cb)
00043
00044 self.move_test(component_name,"home")
00045 self.move_test(component_name,"grasp-to-tray")
00046 self.move_test(component_name,"wave")
00047 elif command == "stop":
00048
00049 ss_name = "/arm_controller/" + command
00050 self.ss_cb_executed = False
00051 rospy.Service(ss_name, Trigger, self.ss_cb)
00052
00053 self.sss.stop("arm")
00054
00055 if not self.ss_cb_executed:
00056 self.fail('Service Server not called')
00057 elif command == "init":
00058
00059 ss_name = "/arm_controller/" + command
00060 self.ss_cb_executed = False
00061 rospy.Service(ss_name, Trigger, self.ss_cb)
00062
00063 self.sss.init("arm")
00064
00065 if not self.ss_cb_executed:
00066 self.fail('Service Server not called')
00067 elif command == "recover":
00068
00069 ss_name = "/arm_controller/" + command
00070 self.ss_cb_executed = False
00071 rospy.Service(ss_name, Trigger, self.ss_cb)
00072
00073 self.sss.recover("arm")
00074
00075 if not self.ss_cb_executed:
00076 self.fail('Service Server not called')
00077 else:
00078 self.fail("Command not known to test script")
00079
00080 def move_test(self,component_name, parameter_name):
00081 self.as_cb_executed = False
00082
00083
00084 self.sss.move(component_name,parameter_name)
00085
00086
00087 if not self.as_cb_executed:
00088 self.fail('No goal received at action server')
00089
00090
00091 param_string = self.ns_global_prefix + "/" + component_name + "/joint_names"
00092 if not rospy.has_param(param_string):
00093 error_msg = "parameter " + param_string +" does not exist on ROS Parameter Server"
00094 self.fail(error_msg)
00095 joint_names = rospy.get_param(param_string)
00096
00097
00098 if type(parameter_name) is str:
00099 param_string = self.ns_global_prefix + "/" + component_name + "/" + parameter_name
00100 if not rospy.has_param(param_string):
00101 error_msg = "parameter " + param_string + " does not exist on ROS Parameter Server"
00102 self.fail(error_msg)
00103 param = rospy.get_param(param_string)
00104 else:
00105 param = parameter_name
00106
00107
00108 if not type(param) is list:
00109 error_msg = "no valid parameter for " + component_name + ": not a list, aborting..."
00110 self.fail(error_msg)
00111
00112 traj = []
00113
00114 for point in param:
00115
00116 if type(point) is str:
00117 if not rospy.has_param(self.ns_global_prefix + "/" + component_name + "/" + point):
00118 error_msg = "parameter " + self.ns_global_prefix + "/" + component_name + "/" + point + " does not exist on ROS Parameter Server, aborting..."
00119 self.fail(error_msg)
00120 point = rospy.get_param(self.ns_global_prefix + "/" + component_name + "/" + point)
00121 point = point[0]
00122
00123 elif type(point) is list:
00124 rospy.logdebug("point is a list")
00125 else:
00126 error_msg = "no valid parameter for " + component_name + ": not a list of lists or strings, aborting..."
00127 self.fail(error_msg)
00128
00129
00130
00131 if not len(point) == len(joint_names):
00132 error_msg = "no valid parameter for " + component_name + ": dimension should be " + len(joint_names) + " and is " + len(point) + ", aborting..."
00133 self.fail(error_msg)
00134
00135 for value in point:
00136
00137 if not ((type(value) is float) or (type(value) is int)):
00138
00139 error_msg = "no valid parameter for " + component_name + ": not a list of float or int, aborting..."
00140 self.fail(error_msg)
00141 traj.append(point)
00142
00143 traj_msg = JointTrajectory()
00144 traj_msg.header.stamp = rospy.Time.now()+rospy.Duration(0.5)
00145 traj_msg.joint_names = joint_names
00146 point_nr = 0
00147 for point in traj:
00148 point_msg = JointTrajectoryPoint()
00149 point_msg.positions = point
00150 point_msg.time_from_start=rospy.Duration(3*point_nr)
00151 traj_msg.points.append(point_msg)
00152
00153
00154
00155
00156
00157 if not (len(traj_msg.points) == len(self.traj.points)):
00158 self.fail('Not the same amount of points in trajectory')
00159
00160
00161
00162
00163 for i in range(len(traj_msg.points)):
00164
00165
00166
00167 if not (len(traj_msg.points[i].positions) == len(self.traj.points[i].positions)):
00168 self.fail('Not the same amount of values in point')
00169
00170
00171 for j in range(len(traj_msg.points[i].positions)):
00172 if not (traj_msg.points[i].positions[j] == self.traj.points[i].positions[j]):
00173 self.fail('Not the same values in point')
00174
00175 def as_cb(self, goal):
00176
00177 result = FollowJointTrajectoryResult()
00178
00179 self.as_cb_executed = True
00180 self.traj = goal.trajectory
00181 print "action server callback"
00182 self.as_server.set_succeeded(result)
00183
00184 def ss_cb(self,req):
00185 self.ss_cb_executed = True
00186 res = TriggerResponse()
00187 res.success.data = True
00188 return res
00189
00190 if __name__ == '__main__':
00191 try:
00192 rostest.run('rostest', 'test_python_apt_test', PythonAPITest, sys.argv)
00193 except KeyboardInterrupt, e:
00194 pass
00195 print "exiting"
00196