python_api.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import roslib
00003 roslib.load_manifest('cob_script_server')
00004 
00005 import sys
00006 import unittest
00007 
00008 import rospy
00009 import rostest
00010 from trajectory_msgs.msg import *
00011 from simple_script_server import *
00012 from pr2_controllers_msgs.msg import *
00013 from control_msgs.msg import *
00014 from cob_srvs.srv import *
00015 
00016 class PythonAPITest(unittest.TestCase):
00017         def __init__(self, *args):
00018                 super(PythonAPITest, self).__init__(*args)
00019                 rospy.init_node('test_python_api_test')
00020                 self.sss=simple_script_server()
00021                 self.as_cb_executed = False
00022                 self.ss_stop_cb_executed = False
00023                 self.ns_global_prefix = "/script_server"
00024 
00025         def test_python_api(self):
00026                 # get parameters
00027                 try:
00028                         # component
00029                         if not rospy.has_param('~command'):
00030                                 self.fail('Parameter command does not exist on ROS Parameter Server')
00031                         command = rospy.get_param('~command')
00032                 except KeyError, e:
00033                         self.fail('Parameters not set properly')                
00034 
00035                 # choose command to test
00036                 if command == "move":
00037                         component_name = "arm"
00038                         # init action server (as)
00039                         ##as_name = "/" + component_name + "_controller/joint_trajectory_action"
00040                         ##self.as_server = actionlib.SimpleActionServer(as_name, JointTrajectoryAction, execute_cb=self.as_cb)
00041                         as_name = "/" + component_name + "_controller/follow_joint_trajectory"
00042                         self.as_server = actionlib.SimpleActionServer(as_name, FollowJointTrajectoryAction, execute_cb=self.as_cb)                      
00043                         #execute test (as all components have the same trajectory interface, we only test for arm)
00044                         self.move_test(component_name,"home") # test trajectories with a single point (home)
00045                         self.move_test(component_name,"grasp-to-tray") # test trajectories with multiple points (grasp-to-tray)
00046                         self.move_test(component_name,"wave") # test trajectories out of other points (wave)
00047                 elif command == "stop":
00048                         # prepare service server (ss)
00049                         ss_name = "/arm_controller/" + command
00050                         self.ss_cb_executed = False
00051                         rospy.Service(ss_name, Trigger, self.ss_cb)
00052                         # call sss function
00053                         self.sss.stop("arm")
00054                         # check result
00055                         if not self.ss_cb_executed:
00056                                 self.fail('Service Server not called')
00057                 elif command == "init":
00058                         # prepare service server (ss)
00059                         ss_name = "/arm_controller/" + command
00060                         self.ss_cb_executed = False
00061                         rospy.Service(ss_name, Trigger, self.ss_cb)
00062                         # call sss function
00063                         self.sss.init("arm")
00064                         # check result
00065                         if not self.ss_cb_executed:
00066                                 self.fail('Service Server not called')
00067                 elif command == "recover":
00068                         # prepare service server (ss)
00069                         ss_name = "/arm_controller/" + command
00070                         self.ss_cb_executed = False
00071                         rospy.Service(ss_name, Trigger, self.ss_cb)
00072                         # call sss function
00073                         self.sss.recover("arm")
00074                         # check result
00075                         if not self.ss_cb_executed:
00076                                 self.fail('Service Server not called')
00077                 else:
00078                         self.fail("Command not known to test script")
00079 
00080         def move_test(self,component_name, parameter_name):
00081                 self.as_cb_executed = False
00082                 
00083                 # call sss functions
00084                 self.sss.move(component_name,parameter_name)            
00085                 
00086                 # check result
00087                 if not self.as_cb_executed:
00088                         self.fail('No goal received at action server')
00089                 
00090                 # get joint_names from parameter server
00091                 param_string = self.ns_global_prefix + "/" + component_name + "/joint_names"
00092                 if not rospy.has_param(param_string):
00093                         error_msg = "parameter " + param_string +" does not exist on ROS Parameter Server"
00094                         self.fail(error_msg)
00095                 joint_names = rospy.get_param(param_string)
00096                 
00097                 # get joint values from parameter server
00098                 if type(parameter_name) is str:
00099                         param_string = self.ns_global_prefix + "/" + component_name + "/" + parameter_name
00100                         if not rospy.has_param(param_string):
00101                                 error_msg = "parameter " + param_string + " does not exist on ROS Parameter Server"
00102                                 self.fail(error_msg)
00103                         param = rospy.get_param(param_string)
00104                 else:
00105                         param = parameter_name
00106                         
00107                 # check trajectory parameters
00108                 if not type(param) is list: # check outer list
00109                                 error_msg = "no valid parameter for " + component_name + ": not a list, aborting..."
00110                                 self.fail(error_msg)
00111                 
00112                 traj = []
00113                 
00114                 for point in param:
00115                         #print point,"type1 = ", type(point)
00116                         if type(point) is str:
00117                                 if not rospy.has_param(self.ns_global_prefix + "/" + component_name + "/" + point):
00118                                         error_msg = "parameter " + self.ns_global_prefix + "/" + component_name + "/" + point + " does not exist on ROS Parameter Server, aborting..."
00119                                         self.fail(error_msg)
00120                                 point = rospy.get_param(self.ns_global_prefix + "/" + component_name + "/" + point)
00121                                 point = point[0] # \todo hack because only first point is used, no support for trajectories inside trajectories
00122                                 #print point
00123                         elif type(point) is list:
00124                                 rospy.logdebug("point is a list")
00125                         else:
00126                                 error_msg = "no valid parameter for " + component_name + ": not a list of lists or strings, aborting..."
00127                                 self.fail(error_msg)
00128                 
00129                         # here: point should be list of floats/ints
00130                         #print point
00131                         if not len(point) == len(joint_names): # check dimension
00132                                 error_msg = "no valid parameter for " + component_name + ": dimension should be " + len(joint_names) + " and is " + len(point) + ", aborting..."
00133                                 self.fail(error_msg)
00134 
00135                         for value in point:
00136                                 #print value,"type2 = ", type(value)
00137                                 if not ((type(value) is float) or (type(value) is int)): # check type
00138                                         #print type(value)
00139                                         error_msg = "no valid parameter for " + component_name + ": not a list of float or int, aborting..."
00140                                         self.fail(error_msg)
00141                         traj.append(point)
00142                 
00143                 traj_msg = JointTrajectory()
00144                 traj_msg.header.stamp = rospy.Time.now()+rospy.Duration(0.5)
00145                 traj_msg.joint_names = joint_names
00146                 point_nr = 0
00147                 for point in traj:
00148                         point_msg = JointTrajectoryPoint()
00149                         point_msg.positions = point
00150                         point_msg.time_from_start=rospy.Duration(3*point_nr) # this value is set to 3 sec per point. \todo: read from parameter
00151                         traj_msg.points.append(point_msg)
00152                 
00153                 #print traj_msg
00154                 #print self.traj
00155                 
00156                 # check amount of trajectory points
00157                 if not (len(traj_msg.points) == len(self.traj.points)):
00158                         self.fail('Not the same amount of points in trajectory')
00159                 
00160                 # check points
00161                 #print traj_msg.points
00162                 #print self.traj.points
00163                 for i in range(len(traj_msg.points)):
00164                         # check dimension of points
00165                         #print traj_msg.points[i].positions
00166                         #print self.traj.points[i].positions
00167                         if not (len(traj_msg.points[i].positions) == len(self.traj.points[i].positions)):
00168                                 self.fail('Not the same amount of values in point')
00169                         
00170                         # check values of points
00171                         for j in range(len(traj_msg.points[i].positions)):
00172                                 if not (traj_msg.points[i].positions[j] == self.traj.points[i].positions[j]):
00173                                         self.fail('Not the same values in point')
00174                 
00175         def as_cb(self, goal):
00176                 #result = JointTrajectoryResult()
00177                 result = FollowJointTrajectoryResult()
00178                 #self.as_server.set_preempted(result)
00179                 self.as_cb_executed = True
00180                 self.traj = goal.trajectory
00181                 print "action server callback"
00182                 self.as_server.set_succeeded(result)
00183         
00184         def ss_cb(self,req):
00185                 self.ss_cb_executed = True
00186                 res = TriggerResponse()
00187                 res.success.data = True
00188                 return res
00189 
00190 if __name__ == '__main__':
00191         try:
00192                 rostest.run('rostest', 'test_python_apt_test', PythonAPITest, sys.argv)
00193         except KeyboardInterrupt, e:
00194                 pass
00195         print "exiting"
00196 


cob_script_server
Author(s): Florian Weisshardt
autogenerated on Sun Oct 5 2014 22:59:42