component_test.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 
18 import sys
19 import time
20 import unittest
21 
22 import rospy
23 import rostest
24 from trajectory_msgs.msg import JointTrajectory
25 from control_msgs.msg import JointTrajectoryControllerState
26 from simple_script_server import *
27 
28 class UnitTest(unittest.TestCase):
29  def __init__(self, *args):
30  super(UnitTest, self).__init__(*args)
31  rospy.init_node('component_test')
32  self.message_received = False
34 
35  def setUp(self):
36  self.errors = []
37 
38  def test_component(self):
39  # get parameters
40  try:
41  # component
42  if not rospy.has_param('~component'):
43  self.fail('Parameter component does not exist on ROS Parameter Server')
44  component = rospy.get_param('~component')
45 
46  # movement command
47  if not rospy.has_param('~target'):
48  self.fail('Parameter target does not exist on ROS Parameter Server')
49  target = rospy.get_param('~target')
50 
51  # time to wait before
52  wait_time = rospy.get_param('~wait_time',5)
53 
54  # error range
55  if not rospy.has_param('~error_range'):
56  self.fail('Parameter error_range does not exist on ROS Parameter Server')
57  error_range = rospy.get_param('~error_range')
58 
59  except KeyError:
60  self.fail('Parameters not set properly')
61 
62  print("""
63  Component: %s
64  Target: %s
65  Wait Time: %s
66  Error Range: %s"""%(component, target, wait_time, error_range))
67 
68  # check parameters
69  # \todo do more parameter tests
70  if error_range < 0.0:
71  error_msg = "Parameter error_range should be positive, but is " + error_range
72  self.fail(error_msg)
73  if wait_time < 0.0:
74  error_msg = "Parameter wait_time should be positive, but is " + wait_time
75  self.fail(error_msg)
76 
77  # init subscribers
78  command_topic = "/" + component + "_controller/command"
79  state_topic = "/" + component + "_controller/state"
80  rospy.Subscriber(command_topic, JointTrajectory, self.cb_command)
81  rospy.Subscriber(state_topic, JointTrajectoryControllerState, self.cb_state)
82 
83  # init component
84  init_handle = self.sss.init(component)
85  if init_handle.get_error_code() != 0:
86  error_msg = 'Could not initialize ' + component
87  self.fail(error_msg)
88 
89  # start actual test
90  print("Waiting for messages")
91  # give the topics some seconds to receive messages
92  wallclock_timeout_t = time.time() + wait_time
93  while not self.message_received and time.time() < wallclock_timeout_t:
94  #print "###debug here###"
95  time.sleep(0.1)
96  if not self.message_received:
97  self.fail('No state message received within wait_time')
98 
99  # send commands to component
100  move_handle = self.sss.move(component,target)
101  # move_handle = self.sss.move("arm","folded")
102  if move_handle.get_error_code() != 0:
103  error_msg = 'Could not move ' + component
104  self.fail(error_msg + "; errorCode: " + str(move_handle.get_error_code()))
105 
106  # get last point out of trajectory
107  traj_endpoint = self.command_traj.points[len(self.command_traj.points)-1]
108 
109  # Start evaluation
110  timeout_t = traj_endpoint.time_from_start.to_sec()*0.5 # movement should already be finished, but let wait with an additional buffer of 50% times the desired time
111  rospy.sleep(timeout_t)
112  print("Done waiting, validating results")
113  actual_pos = self.actual_pos # fix current position configuration for later evaluation
114 
115  # checking if target position is realy reached
116  for i in range(len(traj_endpoint.positions)):
117  self.assert_(((traj_endpoint.positions[i] - actual_pos[i]) < error_range), "Target position out of error_range")
118 
119  # callback functions
120  def cb_state(self, msg):
121  self.message_received = True
122  self.actual_pos = msg.actual.positions
123  def cb_command(self, msg):
124  self.command_traj = msg
125 
126 if __name__ == '__main__':
127  try:
128  rostest.run('rostest', 'component_test', UnitTest, sys.argv)
129  except KeyboardInterrupt:
130  pass
131  print("exiting")
132 
def cb_command(self, msg)
def __init__(self, args)


cob_gazebo
Author(s): Felix Messmer , Nadia Hammoudeh Garcia , Florian Weisshardt
autogenerated on Sat Dec 5 2020 03:59:45