test_python_api.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 
18 import sys
19 import unittest
20 
21 import rospy
22 import rostest
23 import actionlib
24 from std_srvs.srv import Trigger, TriggerResponse
25 from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
26 from control_msgs.msg import FollowJointTrajectoryAction, FollowJointTrajectoryResult
27 from simple_script_server import *
28 
29 
30 class PythonAPITest(unittest.TestCase):
31  def __init__(self, *args):
32  super(PythonAPITest, self).__init__(*args)
33  rospy.init_node('test_python_api_test')
35  self.as_cb_executed = False
36  self.ss_stop_cb_executed = False
37  self.ns_global_prefix = "/script_server"
38 
39  def test_python_api(self):
40  # get parameters
41  try:
42  # component
43  if not rospy.has_param('~command'):
44  self.fail('Parameter command does not exist on ROS Parameter Server')
45  command = rospy.get_param('~command')
46  except KeyError:
47  self.fail('Parameters not set properly')
48 
49  # choose command to test
50  if command == "move":
51  component_name = "arm"
52  # init action server (as)
53  ##as_name = "/" + component_name + "_controller/joint_trajectory_action"
54  ##self.as_server = actionlib.SimpleActionServer(as_name, JointTrajectoryAction, execute_cb=self.as_cb)
55  as_name = "/" + component_name + "_controller/follow_joint_trajectory"
56  self.as_server = actionlib.SimpleActionServer(as_name, FollowJointTrajectoryAction, execute_cb=self.as_cb)
57  #execute test (as all components have the same trajectory interface, we only test for arm)
58  self.move_test(component_name,"home") # test trajectories with a single point (home)
59  self.move_test(component_name,"grasp-to-tray") # test trajectories with multiple points (grasp-to-tray)
60  self.move_test(component_name,"wave") # test trajectories out of other points (wave)
61  elif command == "stop":
62  # prepare service server (ss)
63  ss_name = "/arm_controller/" + command
64  self.ss_cb_executed = False
65  rospy.Service(ss_name, Trigger, self.ss_cb)
66  # call sss function
67  self.sss.stop("arm")
68  # check result
69  if not self.ss_cb_executed:
70  self.fail('Service Server not called')
71  elif command == "init":
72  # prepare service server (ss)
73  ss_name = "/arm_controller/" + command
74  self.ss_cb_executed = False
75  rospy.Service(ss_name, Trigger, self.ss_cb)
76  # call sss function
77  self.sss.init("arm")
78  # check result
79  if not self.ss_cb_executed:
80  self.fail('Service Server not called')
81  elif command == "recover":
82  # prepare service server (ss)
83  ss_name = "/arm_controller/" + command
84  self.ss_cb_executed = False
85  rospy.Service(ss_name, Trigger, self.ss_cb)
86  # call sss function
87  self.sss.recover("arm")
88  # check result
89  if not self.ss_cb_executed:
90  self.fail('Service Server not called')
91  else:
92  self.fail("Command not known to test script")
93 
94  def move_test(self,component_name, parameter_name):
95  self.as_cb_executed = False
96 
97  # call sss functions
98  self.sss.move(component_name,parameter_name)
99 
100  # check result
101  if not self.as_cb_executed:
102  self.fail('No goal received at action server')
103 
104  # get joint_names from parameter server
105  param_string = self.ns_global_prefix + "/" + component_name + "/joint_names"
106  if not rospy.has_param(param_string):
107  error_msg = "parameter " + param_string +" does not exist on ROS Parameter Server"
108  self.fail(error_msg)
109  joint_names = rospy.get_param(param_string)
110 
111  # get joint values from parameter server
112  if type(parameter_name) is str:
113  param_string = self.ns_global_prefix + "/" + component_name + "/" + parameter_name
114  if not rospy.has_param(param_string):
115  error_msg = "parameter " + param_string + " does not exist on ROS Parameter Server"
116  self.fail(error_msg)
117  param = rospy.get_param(param_string)
118  else:
119  param = parameter_name
120 
121  # check trajectory parameters
122  if not type(param) is list: # check outer list
123  error_msg = "no valid parameter for " + component_name + ": not a list, aborting..."
124  self.fail(error_msg)
125 
126  traj = []
127 
128  for point in param:
129  #print point,"type1 = ", type(point)
130  if type(point) is str:
131  if not rospy.has_param(self.ns_global_prefix + "/" + component_name + "/" + point):
132  error_msg = "parameter " + self.ns_global_prefix + "/" + component_name + "/" + point + " does not exist on ROS Parameter Server, aborting..."
133  self.fail(error_msg)
134  point = rospy.get_param(self.ns_global_prefix + "/" + component_name + "/" + point)
135  point = point[0] # \todo hack because only first point is used, no support for trajectories inside trajectories
136  #print point
137  elif type(point) is list:
138  rospy.logdebug("point is a list")
139  else:
140  error_msg = "no valid parameter for " + component_name + ": not a list of lists or strings, aborting..."
141  self.fail(error_msg)
142 
143  # here: point should be list of floats/ints
144  #print point
145  if not len(point) == len(joint_names): # check dimension
146  error_msg = "no valid parameter for " + component_name + ": dimension should be " + len(joint_names) + " and is " + len(point) + ", aborting..."
147  self.fail(error_msg)
148 
149  for value in point:
150  #print value,"type2 = ", type(value)
151  if not ((type(value) is float) or (type(value) is int)): # check type
152  #print type(value)
153  error_msg = "no valid parameter for " + component_name + ": not a list of float or int, aborting..."
154  self.fail(error_msg)
155  traj.append(point)
156 
157  traj_msg = JointTrajectory()
158  traj_msg.header.stamp = rospy.Time.now()+rospy.Duration(0.5)
159  traj_msg.joint_names = joint_names
160  point_nr = 0
161  for point in traj:
162  point_msg = JointTrajectoryPoint()
163  point_msg.positions = point
164  point_msg.time_from_start=rospy.Duration(3*point_nr) # this value is set to 3 sec per point. \todo: read from parameter
165  traj_msg.points.append(point_msg)
166 
167  #print traj_msg
168  #print self.traj
169 
170  # check amount of trajectory points
171  if not (len(traj_msg.points) == len(self.traj.points)):
172  self.fail('Not the same amount of points in trajectory')
173 
174  # check points
175  #print traj_msg.points
176  #print self.traj.points
177  for i in range(len(traj_msg.points)):
178  # check dimension of points
179  #print traj_msg.points[i].positions
180  #print self.traj.points[i].positions
181  if not (len(traj_msg.points[i].positions) == len(self.traj.points[i].positions)):
182  self.fail('Not the same amount of values in point')
183 
184  # check values of points
185  for j in range(len(traj_msg.points[i].positions)):
186  if not (traj_msg.points[i].positions[j] == self.traj.points[i].positions[j]):
187  self.fail('Not the same values in point')
188 
189  def as_cb(self, goal):
190  #result = JointTrajectoryResult()
191  result = FollowJointTrajectoryResult()
192  #self.as_server.set_preempted(result)
193  self.as_cb_executed = True
194  self.traj = goal.trajectory
195  print("action server callback")
196  self.as_server.set_succeeded(result)
197 
198  def ss_cb(self,req):
199  self.ss_cb_executed = True
200  res = TriggerResponse()
201  res.success = True
202  return res
203 
204 if __name__ == '__main__':
205  try:
206  rostest.run('rostest', 'test_python_apt_test', PythonAPITest, sys.argv)
207  except KeyboardInterrupt as e:
208  pass
209  print("exiting")
210 
def move_test(self, component_name, parameter_name)
as_server
as_name = "/" + component_name + "_controller/joint_trajectory_action" self.as_server = actionlib...


cob_script_server
Author(s): Florian Weisshardt
autogenerated on Wed Apr 7 2021 03:03:06