test_python_api.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
00004 #
00005 # Licensed under the Apache License, Version 2.0 (the "License");
00006 # you may not use this file except in compliance with the License.
00007 # You may obtain a copy of the License at
00008 #
00009 #   http://www.apache.org/licenses/LICENSE-2.0
00010 #
00011 # Unless required by applicable law or agreed to in writing, software
00012 # distributed under the License is distributed on an "AS IS" BASIS,
00013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014 # See the License for the specific language governing permissions and
00015 # limitations under the License.
00016 
00017 
00018 import sys
00019 import unittest
00020 
00021 import rospy
00022 import rostest
00023 from std_srvs.srv import Trigger
00024 from trajectory_msgs.msg import *
00025 from control_msgs.msg import *
00026 from simple_script_server import *
00027 
00028 
00029 class PythonAPITest(unittest.TestCase):
00030         def __init__(self, *args):
00031                 super(PythonAPITest, self).__init__(*args)
00032                 rospy.init_node('test_python_api_test')
00033                 self.sss=simple_script_server()
00034                 self.as_cb_executed = False
00035                 self.ss_stop_cb_executed = False
00036                 self.ns_global_prefix = "/script_server"
00037 
00038         def test_python_api(self):
00039                 # get parameters
00040                 try:
00041                         # component
00042                         if not rospy.has_param('~command'):
00043                                 self.fail('Parameter command does not exist on ROS Parameter Server')
00044                         command = rospy.get_param('~command')
00045                 except KeyError, e:
00046                         self.fail('Parameters not set properly')
00047 
00048                 # choose command to test
00049                 if command == "move":
00050                         component_name = "arm"
00051                         # init action server (as)
00052                         ##as_name = "/" + component_name + "_controller/joint_trajectory_action"
00053                         ##self.as_server = actionlib.SimpleActionServer(as_name, JointTrajectoryAction, execute_cb=self.as_cb)
00054                         as_name = "/" + component_name + "_controller/follow_joint_trajectory"
00055                         self.as_server = actionlib.SimpleActionServer(as_name, FollowJointTrajectoryAction, execute_cb=self.as_cb)
00056                         #execute test (as all components have the same trajectory interface, we only test for arm)
00057                         self.move_test(component_name,"home") # test trajectories with a single point (home)
00058                         self.move_test(component_name,"grasp-to-tray") # test trajectories with multiple points (grasp-to-tray)
00059                         self.move_test(component_name,"wave") # test trajectories out of other points (wave)
00060                 elif command == "stop":
00061                         # prepare service server (ss)
00062                         ss_name = "/arm_controller/" + command
00063                         self.ss_cb_executed = False
00064                         rospy.Service(ss_name, Trigger, self.ss_cb)
00065                         # call sss function
00066                         self.sss.stop("arm")
00067                         # check result
00068                         if not self.ss_cb_executed:
00069                                 self.fail('Service Server not called')
00070                 elif command == "init":
00071                         # prepare service server (ss)
00072                         ss_name = "/arm_controller/" + command
00073                         self.ss_cb_executed = False
00074                         rospy.Service(ss_name, Trigger, self.ss_cb)
00075                         # call sss function
00076                         self.sss.init("arm")
00077                         # check result
00078                         if not self.ss_cb_executed:
00079                                 self.fail('Service Server not called')
00080                 elif command == "recover":
00081                         # prepare service server (ss)
00082                         ss_name = "/arm_controller/" + command
00083                         self.ss_cb_executed = False
00084                         rospy.Service(ss_name, Trigger, self.ss_cb)
00085                         # call sss function
00086                         self.sss.recover("arm")
00087                         # check result
00088                         if not self.ss_cb_executed:
00089                                 self.fail('Service Server not called')
00090                 else:
00091                         self.fail("Command not known to test script")
00092 
00093         def move_test(self,component_name, parameter_name):
00094                 self.as_cb_executed = False
00095 
00096                 # call sss functions
00097                 self.sss.move(component_name,parameter_name)
00098 
00099                 # check result
00100                 if not self.as_cb_executed:
00101                         self.fail('No goal received at action server')
00102 
00103                 # get joint_names from parameter server
00104                 param_string = self.ns_global_prefix + "/" + component_name + "/joint_names"
00105                 if not rospy.has_param(param_string):
00106                         error_msg = "parameter " + param_string +" does not exist on ROS Parameter Server"
00107                         self.fail(error_msg)
00108                 joint_names = rospy.get_param(param_string)
00109 
00110                 # get joint values from parameter server
00111                 if type(parameter_name) is str:
00112                         param_string = self.ns_global_prefix + "/" + component_name + "/" + parameter_name
00113                         if not rospy.has_param(param_string):
00114                                 error_msg = "parameter " + param_string + " does not exist on ROS Parameter Server"
00115                                 self.fail(error_msg)
00116                         param = rospy.get_param(param_string)
00117                 else:
00118                         param = parameter_name
00119 
00120                 # check trajectory parameters
00121                 if not type(param) is list: # check outer list
00122                                 error_msg = "no valid parameter for " + component_name + ": not a list, aborting..."
00123                                 self.fail(error_msg)
00124 
00125                 traj = []
00126 
00127                 for point in param:
00128                         #print point,"type1 = ", type(point)
00129                         if type(point) is str:
00130                                 if not rospy.has_param(self.ns_global_prefix + "/" + component_name + "/" + point):
00131                                         error_msg = "parameter " + self.ns_global_prefix + "/" + component_name + "/" + point + " does not exist on ROS Parameter Server, aborting..."
00132                                         self.fail(error_msg)
00133                                 point = rospy.get_param(self.ns_global_prefix + "/" + component_name + "/" + point)
00134                                 point = point[0] # \todo hack because only first point is used, no support for trajectories inside trajectories
00135                                 #print point
00136                         elif type(point) is list:
00137                                 rospy.logdebug("point is a list")
00138                         else:
00139                                 error_msg = "no valid parameter for " + component_name + ": not a list of lists or strings, aborting..."
00140                                 self.fail(error_msg)
00141 
00142                         # here: point should be list of floats/ints
00143                         #print point
00144                         if not len(point) == len(joint_names): # check dimension
00145                                 error_msg = "no valid parameter for " + component_name + ": dimension should be " + len(joint_names) + " and is " + len(point) + ", aborting..."
00146                                 self.fail(error_msg)
00147 
00148                         for value in point:
00149                                 #print value,"type2 = ", type(value)
00150                                 if not ((type(value) is float) or (type(value) is int)): # check type
00151                                         #print type(value)
00152                                         error_msg = "no valid parameter for " + component_name + ": not a list of float or int, aborting..."
00153                                         self.fail(error_msg)
00154                         traj.append(point)
00155 
00156                 traj_msg = JointTrajectory()
00157                 traj_msg.header.stamp = rospy.Time.now()+rospy.Duration(0.5)
00158                 traj_msg.joint_names = joint_names
00159                 point_nr = 0
00160                 for point in traj:
00161                         point_msg = JointTrajectoryPoint()
00162                         point_msg.positions = point
00163                         point_msg.time_from_start=rospy.Duration(3*point_nr) # this value is set to 3 sec per point. \todo: read from parameter
00164                         traj_msg.points.append(point_msg)
00165 
00166                 #print traj_msg
00167                 #print self.traj
00168 
00169                 # check amount of trajectory points
00170                 if not (len(traj_msg.points) == len(self.traj.points)):
00171                         self.fail('Not the same amount of points in trajectory')
00172 
00173                 # check points
00174                 #print traj_msg.points
00175                 #print self.traj.points
00176                 for i in range(len(traj_msg.points)):
00177                         # check dimension of points
00178                         #print traj_msg.points[i].positions
00179                         #print self.traj.points[i].positions
00180                         if not (len(traj_msg.points[i].positions) == len(self.traj.points[i].positions)):
00181                                 self.fail('Not the same amount of values in point')
00182 
00183                         # check values of points
00184                         for j in range(len(traj_msg.points[i].positions)):
00185                                 if not (traj_msg.points[i].positions[j] == self.traj.points[i].positions[j]):
00186                                         self.fail('Not the same values in point')
00187 
00188         def as_cb(self, goal):
00189                 #result = JointTrajectoryResult()
00190                 result = FollowJointTrajectoryResult()
00191                 #self.as_server.set_preempted(result)
00192                 self.as_cb_executed = True
00193                 self.traj = goal.trajectory
00194                 print "action server callback"
00195                 self.as_server.set_succeeded(result)
00196 
00197         def ss_cb(self,req):
00198                 self.ss_cb_executed = True
00199                 res = TriggerResponse()
00200                 res.success = True
00201                 return res
00202 
00203 if __name__ == '__main__':
00204         try:
00205                 rostest.run('rostest', 'test_python_apt_test', PythonAPITest, sys.argv)
00206         except KeyboardInterrupt, e:
00207                 pass
00208         print "exiting"
00209 


cob_script_server
Author(s): Florian Weisshardt
autogenerated on Sun Jun 9 2019 20:20:12