00001
00002 import roslib
00003 roslib.load_manifest('cob_script_server')
00004
00005 import sys
00006 import unittest
00007
00008 import rospy
00009 import rostest
00010 from trajectory_msgs.msg import *
00011 from simple_script_server import *
00012 from pr2_controllers_msgs.msg import *
00013 from cob_srvs.srv import *
00014
00015 class PythonAPITest(unittest.TestCase):
00016 def __init__(self, *args):
00017 super(PythonAPITest, self).__init__(*args)
00018 rospy.init_node('test_python_api_test')
00019 self.sss=simple_script_server()
00020 self.as_cb_executed = False
00021 self.ss_stop_cb_executed = False
00022 self.ns_global_prefix = "/script_server"
00023
00024 def test_python_api(self):
00025
00026 try:
00027
00028 if not rospy.has_param('~command'):
00029 self.fail('Parameter command does not exist on ROS Parameter Server')
00030 command = rospy.get_param('~command')
00031 except KeyError, e:
00032 self.fail('Parameters not set properly')
00033
00034
00035 if command == "move":
00036 component_name = "arm"
00037
00038 as_name = "/" + component_name + "_controller/joint_trajectory_action"
00039 self.as_server = actionlib.SimpleActionServer(as_name, JointTrajectoryAction, execute_cb=self.as_cb)
00040
00041 self.move_test(component_name,"home")
00042 self.move_test(component_name,"grasp-to-tray")
00043 self.move_test(component_name,"wave")
00044 elif command == "stop":
00045
00046 ss_name = "/arm_controller/" + command
00047 self.ss_cb_executed = False
00048 rospy.Service(ss_name, Trigger, self.ss_cb)
00049
00050 self.sss.stop("arm")
00051
00052 if not self.ss_cb_executed:
00053 self.fail('Service Server not called')
00054 elif command == "init":
00055
00056 ss_name = "/arm_controller/" + command
00057 self.ss_cb_executed = False
00058 rospy.Service(ss_name, Trigger, self.ss_cb)
00059
00060 self.sss.init("arm")
00061
00062 if not self.ss_cb_executed:
00063 self.fail('Service Server not called')
00064 elif command == "recover":
00065
00066 ss_name = "/arm_controller/" + command
00067 self.ss_cb_executed = False
00068 rospy.Service(ss_name, Trigger, self.ss_cb)
00069
00070 self.sss.recover("arm")
00071
00072 if not self.ss_cb_executed:
00073 self.fail('Service Server not called')
00074 else:
00075 self.fail("Command not known to test script")
00076
00077 def move_test(self,component_name, parameter_name):
00078 self.as_cb_executed = False
00079
00080
00081 self.sss.move(component_name,parameter_name)
00082
00083
00084 if not self.as_cb_executed:
00085 self.fail('No goal received at action server')
00086
00087
00088 param_string = self.ns_global_prefix + "/" + component_name + "/joint_names"
00089 if not rospy.has_param(param_string):
00090 error_msg = "parameter " + param_string +" does not exist on ROS Parameter Server"
00091 self.fail(error_msg)
00092 joint_names = rospy.get_param(param_string)
00093
00094
00095 if type(parameter_name) is str:
00096 param_string = self.ns_global_prefix + "/" + component_name + "/" + parameter_name
00097 if not rospy.has_param(param_string):
00098 error_msg = "parameter " + param_string + " does not exist on ROS Parameter Server"
00099 self.fail(error_msg)
00100 param = rospy.get_param(param_string)
00101 else:
00102 param = parameter_name
00103
00104
00105 if not type(param) is list:
00106 error_msg = "no valid parameter for " + component_name + ": not a list, aborting..."
00107 self.fail(error_msg)
00108
00109 traj = []
00110
00111 for point in param:
00112
00113 if type(point) is str:
00114 if not rospy.has_param(self.ns_global_prefix + "/" + component_name + "/" + point):
00115 error_msg = "parameter " + self.ns_global_prefix + "/" + component_name + "/" + point + " does not exist on ROS Parameter Server, aborting..."
00116 self.fail(error_msg)
00117 point = rospy.get_param(self.ns_global_prefix + "/" + component_name + "/" + point)
00118 point = point[0]
00119
00120 elif type(point) is list:
00121 rospy.logdebug("point is a list")
00122 else:
00123 error_msg = "no valid parameter for " + component_name + ": not a list of lists or strings, aborting..."
00124 self.fail(error_msg)
00125
00126
00127
00128 if not len(point) == len(joint_names):
00129 error_msg = "no valid parameter for " + component_name + ": dimension should be " + len(joint_names) + " and is " + len(point) + ", aborting..."
00130 self.fail(error_msg)
00131
00132 for value in point:
00133
00134 if not ((type(value) is float) or (type(value) is int)):
00135
00136 error_msg = "no valid parameter for " + component_name + ": not a list of float or int, aborting..."
00137 self.fail(error_msg)
00138 traj.append(point)
00139
00140 traj_msg = JointTrajectory()
00141 traj_msg.header.stamp = rospy.Time.now()+rospy.Duration(0.5)
00142 traj_msg.joint_names = joint_names
00143 point_nr = 0
00144 for point in traj:
00145 point_msg = JointTrajectoryPoint()
00146 point_msg.positions = point
00147 point_msg.time_from_start=rospy.Duration(3*point_nr)
00148 traj_msg.points.append(point_msg)
00149
00150
00151
00152
00153
00154 if not (len(traj_msg.points) == len(self.traj.points)):
00155 self.fail('Not the same amount of points in trajectory')
00156
00157
00158
00159
00160 for i in range(len(traj_msg.points)):
00161
00162
00163
00164 if not (len(traj_msg.points[i].positions) == len(self.traj.points[i].positions)):
00165 self.fail('Not the same amount of values in point')
00166
00167
00168 for j in range(len(traj_msg.points[i].positions)):
00169 if not (traj_msg.points[i].positions[j] == self.traj.points[i].positions[j]):
00170 self.fail('Not the same values in point')
00171
00172 def as_cb(self, goal):
00173 result = JointTrajectoryResult()
00174
00175 self.as_cb_executed = True
00176 self.traj = goal.trajectory
00177 print "action server callback"
00178 self.as_server.set_succeeded(result)
00179
00180 def ss_cb(self,req):
00181 self.ss_cb_executed = True
00182 res = TriggerResponse()
00183 res.success.data = True
00184 return res
00185
00186 if __name__ == '__main__':
00187 try:
00188 rostest.run('rostest', 'test_python_apt_test', PythonAPITest, sys.argv)
00189 except KeyboardInterrupt, e:
00190 pass
00191 print "exiting"
00192