python_api.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import roslib
00003 roslib.load_manifest('cob_script_server')
00004 
00005 import sys
00006 import unittest
00007 
00008 import rospy
00009 import rostest
00010 from trajectory_msgs.msg import *
00011 from simple_script_server import *
00012 from control_msgs.msg import *
00013 from cob_srvs.srv import *
00014 
00015 class PythonAPITest(unittest.TestCase):
00016         def __init__(self, *args):
00017                 super(PythonAPITest, self).__init__(*args)
00018                 rospy.init_node('test_python_api_test')
00019                 self.sss=simple_script_server()
00020                 self.as_cb_executed = False
00021                 self.ss_stop_cb_executed = False
00022                 self.ns_global_prefix = "/script_server"
00023 
00024         def test_python_api(self):
00025                 # get parameters
00026                 try:
00027                         # component
00028                         if not rospy.has_param('~command'):
00029                                 self.fail('Parameter command does not exist on ROS Parameter Server')
00030                         command = rospy.get_param('~command')
00031                 except KeyError, e:
00032                         self.fail('Parameters not set properly')                
00033 
00034                 # choose command to test
00035                 if command == "move":
00036                         component_name = "arm"
00037                         # init action server (as)
00038                         ##as_name = "/" + component_name + "_controller/joint_trajectory_action"
00039                         ##self.as_server = actionlib.SimpleActionServer(as_name, JointTrajectoryAction, execute_cb=self.as_cb)
00040                         as_name = "/" + component_name + "_controller/follow_joint_trajectory"
00041                         self.as_server = actionlib.SimpleActionServer(as_name, FollowJointTrajectoryAction, execute_cb=self.as_cb)                      
00042                         #execute test (as all components have the same trajectory interface, we only test for arm)
00043                         self.move_test(component_name,"home") # test trajectories with a single point (home)
00044                         self.move_test(component_name,"grasp-to-tray") # test trajectories with multiple points (grasp-to-tray)
00045                         self.move_test(component_name,"wave") # test trajectories out of other points (wave)
00046                 elif command == "stop":
00047                         # prepare service server (ss)
00048                         ss_name = "/arm_controller/" + command
00049                         self.ss_cb_executed = False
00050                         rospy.Service(ss_name, Trigger, self.ss_cb)
00051                         # call sss function
00052                         self.sss.stop("arm")
00053                         # check result
00054                         if not self.ss_cb_executed:
00055                                 self.fail('Service Server not called')
00056                 elif command == "init":
00057                         # prepare service server (ss)
00058                         ss_name = "/arm_controller/" + command
00059                         self.ss_cb_executed = False
00060                         rospy.Service(ss_name, Trigger, self.ss_cb)
00061                         # call sss function
00062                         self.sss.init("arm")
00063                         # check result
00064                         if not self.ss_cb_executed:
00065                                 self.fail('Service Server not called')
00066                 elif command == "recover":
00067                         # prepare service server (ss)
00068                         ss_name = "/arm_controller/" + command
00069                         self.ss_cb_executed = False
00070                         rospy.Service(ss_name, Trigger, self.ss_cb)
00071                         # call sss function
00072                         self.sss.recover("arm")
00073                         # check result
00074                         if not self.ss_cb_executed:
00075                                 self.fail('Service Server not called')
00076                 else:
00077                         self.fail("Command not known to test script")
00078 
00079         def move_test(self,component_name, parameter_name):
00080                 self.as_cb_executed = False
00081                 
00082                 # call sss functions
00083                 self.sss.move(component_name,parameter_name)            
00084                 
00085                 # check result
00086                 if not self.as_cb_executed:
00087                         self.fail('No goal received at action server')
00088                 
00089                 # get joint_names from parameter server
00090                 param_string = self.ns_global_prefix + "/" + component_name + "/joint_names"
00091                 if not rospy.has_param(param_string):
00092                         error_msg = "parameter " + param_string +" does not exist on ROS Parameter Server"
00093                         self.fail(error_msg)
00094                 joint_names = rospy.get_param(param_string)
00095                 
00096                 # get joint values from parameter server
00097                 if type(parameter_name) is str:
00098                         param_string = self.ns_global_prefix + "/" + component_name + "/" + parameter_name
00099                         if not rospy.has_param(param_string):
00100                                 error_msg = "parameter " + param_string + " does not exist on ROS Parameter Server"
00101                                 self.fail(error_msg)
00102                         param = rospy.get_param(param_string)
00103                 else:
00104                         param = parameter_name
00105                         
00106                 # check trajectory parameters
00107                 if not type(param) is list: # check outer list
00108                                 error_msg = "no valid parameter for " + component_name + ": not a list, aborting..."
00109                                 self.fail(error_msg)
00110                 
00111                 traj = []
00112                 
00113                 for point in param:
00114                         #print point,"type1 = ", type(point)
00115                         if type(point) is str:
00116                                 if not rospy.has_param(self.ns_global_prefix + "/" + component_name + "/" + point):
00117                                         error_msg = "parameter " + self.ns_global_prefix + "/" + component_name + "/" + point + " does not exist on ROS Parameter Server, aborting..."
00118                                         self.fail(error_msg)
00119                                 point = rospy.get_param(self.ns_global_prefix + "/" + component_name + "/" + point)
00120                                 point = point[0] # \todo hack because only first point is used, no support for trajectories inside trajectories
00121                                 #print point
00122                         elif type(point) is list:
00123                                 rospy.logdebug("point is a list")
00124                         else:
00125                                 error_msg = "no valid parameter for " + component_name + ": not a list of lists or strings, aborting..."
00126                                 self.fail(error_msg)
00127                 
00128                         # here: point should be list of floats/ints
00129                         #print point
00130                         if not len(point) == len(joint_names): # check dimension
00131                                 error_msg = "no valid parameter for " + component_name + ": dimension should be " + len(joint_names) + " and is " + len(point) + ", aborting..."
00132                                 self.fail(error_msg)
00133 
00134                         for value in point:
00135                                 #print value,"type2 = ", type(value)
00136                                 if not ((type(value) is float) or (type(value) is int)): # check type
00137                                         #print type(value)
00138                                         error_msg = "no valid parameter for " + component_name + ": not a list of float or int, aborting..."
00139                                         self.fail(error_msg)
00140                         traj.append(point)
00141                 
00142                 traj_msg = JointTrajectory()
00143                 traj_msg.header.stamp = rospy.Time.now()+rospy.Duration(0.5)
00144                 traj_msg.joint_names = joint_names
00145                 point_nr = 0
00146                 for point in traj:
00147                         point_msg = JointTrajectoryPoint()
00148                         point_msg.positions = point
00149                         point_msg.time_from_start=rospy.Duration(3*point_nr) # this value is set to 3 sec per point. \todo: read from parameter
00150                         traj_msg.points.append(point_msg)
00151                 
00152                 #print traj_msg
00153                 #print self.traj
00154                 
00155                 # check amount of trajectory points
00156                 if not (len(traj_msg.points) == len(self.traj.points)):
00157                         self.fail('Not the same amount of points in trajectory')
00158                 
00159                 # check points
00160                 #print traj_msg.points
00161                 #print self.traj.points
00162                 for i in range(len(traj_msg.points)):
00163                         # check dimension of points
00164                         #print traj_msg.points[i].positions
00165                         #print self.traj.points[i].positions
00166                         if not (len(traj_msg.points[i].positions) == len(self.traj.points[i].positions)):
00167                                 self.fail('Not the same amount of values in point')
00168                         
00169                         # check values of points
00170                         for j in range(len(traj_msg.points[i].positions)):
00171                                 if not (traj_msg.points[i].positions[j] == self.traj.points[i].positions[j]):
00172                                         self.fail('Not the same values in point')
00173                 
00174         def as_cb(self, goal):
00175                 #result = JointTrajectoryResult()
00176                 result = FollowJointTrajectoryResult()
00177                 #self.as_server.set_preempted(result)
00178                 self.as_cb_executed = True
00179                 self.traj = goal.trajectory
00180                 print "action server callback"
00181                 self.as_server.set_succeeded(result)
00182         
00183         def ss_cb(self,req):
00184                 self.ss_cb_executed = True
00185                 res = TriggerResponse()
00186                 res.success.data = True
00187                 return res
00188 
00189 if __name__ == '__main__':
00190         try:
00191                 rostest.run('rostest', 'test_python_apt_test', PythonAPITest, sys.argv)
00192         except KeyboardInterrupt, e:
00193                 pass
00194         print "exiting"
00195 


cob_script_server
Author(s): Florian Weisshardt
autogenerated on Thu Aug 27 2015 12:42:55