test_ref_action_server.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 # Copyright (c) 2009, Willow Garage, Inc.
3 # All rights reserved.
4 #
5 # Redistribution and use in source and binary forms, with or without
6 # modification, are permitted provided that the following conditions are met:
7 #
8 # * Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
10 # * Redistributions in binary form must reproduce the above copyright
11 # notice, this list of conditions and the following disclaimer in the
12 # documentation and/or other materials provided with the distribution.
13 # * Neither the name of the Willow Garage, Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 
29 PKG = 'actionlib'
30 
31 import unittest
32 import rospy
33 from actionlib_msgs.msg import GoalStatus
34 # TODO(mikaelarguedas) use SimpleActionClient here if it makes sense
35 # from actionlib import SimpleActionClient
36 from actionlib import ActionClient
37 from actionlib.msg import TestAction, TestGoal
38 
39 
40 class TestRefSimpleActionServer(unittest.TestCase):
41 
42  def testsimple(self):
43  return
44  client = ActionClient('reference_action', TestAction)
45  self.assert_(client.wait_for_action_server_to_start(rospy.Duration(2.0)),
46  'Could not connect to the action server')
47 
48  goal = TestGoal(1)
49  client.send_goal(goal)
50  self.assert_(client.wait_for_goal_to_finish(rospy.Duration(2.0)),
51  "Goal didn't finish")
52  self.assertEqual(GoalStatus.SUCCEEDED, client.get_terminal_state())
53  self.assertEqual(GoalStatus.SUCCEEDED, client.get_state())
54 
55  goal = TestGoal(2)
56  client.send_goal(goal)
57  self.assert_(client.wait_for_goal_to_finish(rospy.Duration(10.0)),
58  "Goal didn't finish")
59  self.assertEqual(GoalStatus.ABORTED, client.get_terminal_state())
60  self.assertEqual(GoalStatus.ABORTED, client.get_state())
61 
62  def test_abort(self):
63  client = ActionClient('reference_action', TestAction)
64  self.assert_(client.wait_for_server(rospy.Duration(2.0)),
65  'Could not connect to the action server')
66 
67  goal_work = TestGoal(4)
68  goal_abort = TestGoal(6)
69  goal_feedback = TestGoal(8)
70 
71  g1 = client.send_goal(goal_work)
72  g2 = client.send_goal(goal_work)
73  g3 = client.send_goal(goal_work)
74  g4 = client.send_goal(goal_work)
75 
76  rospy.sleep(0.5)
77  self.assertEqual(g1.get_goal_status(), GoalStatus.ACTIVE) # ,"Should be active")
78  self.assertEqual(g2.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
79  self.assertEqual(g3.get_goal_status(), GoalStatus.ACTIVE, "Shoule be active")
80  self.assertEqual(g4.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
81 
82  g5 = client.send_goal(goal_abort)
83  rospy.sleep(0.5)
84  self.assertEqual(g5.get_goal_status(), GoalStatus.SUCCEEDED, "Should be done")
85 
86  self.assertEqual(g1.get_goal_status(), GoalStatus.ABORTED, "Should be aborted")
87  self.assertEqual(g2.get_goal_status(), GoalStatus.ABORTED, "Should be aborted")
88  self.assertEqual(g3.get_goal_status(), GoalStatus.ABORTED, "Shoule be aborted")
89  self.assertEqual(g4.get_goal_status(), GoalStatus.ABORTED, "Should be aborted")
90 
91  def test_feedback(self):
92  client = ActionClient('reference_action', TestAction)
93  self.assert_(client.wait_for_server(rospy.Duration(2.0)),
94  'Could not connect to the action server')
95 
96  goal_work = TestGoal(4)
97  goal_abort = TestGoal(6)
98  goal_feedback = TestGoal(7)
99 
100  rospy.logwarn("This is a hacky way to associate goals with feedback")
101  feedback = {}
102 
103  def update_feedback(id, g, f):
104  feedback[id] = f
105 
106  g1 = client.send_goal(goal_work, feedback_cb=lambda g, f: update_feedback(0, g, f))
107  g2 = client.send_goal(goal_work, feedback_cb=lambda g, f: update_feedback(1, g, f))
108  g3 = client.send_goal(goal_work, feedback_cb=lambda g, f: update_feedback(2, g, f))
109  g4 = client.send_goal(goal_work, feedback_cb=lambda g, f: update_feedback(3, g, f))
110 
111  rospy.sleep(0.5)
112  self.assertEqual(g1.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
113  self.assertEqual(g2.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
114  self.assertEqual(g3.get_goal_status(), GoalStatus.ACTIVE, "Shoule be active")
115  self.assertEqual(g4.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
116 
117  g5 = client.send_goal(goal_feedback)
118  rospy.sleep(0.5)
119  self.assertEqual(g5.get_goal_status(), GoalStatus.SUCCEEDED, "Should be done")
120 
121  self.assertEqual(g1.get_goal_status(), GoalStatus.ACTIVE)
122  self.assertEqual(feedback[0].feedback, 4)
123  self.assertEqual(g2.get_goal_status(), GoalStatus.ACTIVE)
124  self.assertEqual(feedback[1].feedback, 3)
125  self.assertEqual(g3.get_goal_status(), GoalStatus.ACTIVE)
126  self.assertEqual(feedback[2].feedback, 2)
127  self.assertEqual(g4.get_goal_status(), GoalStatus.ACTIVE)
128  self.assertEqual(feedback[3].feedback, 1)
129 
130  g6 = client.send_goal(goal_abort)
131  rospy.sleep(0.5)
132 
133  def test_result(self):
134  client = ActionClient('reference_action', TestAction)
135  self.assert_(client.wait_for_server(rospy.Duration(2.0)),
136  'Could not connect to the action server')
137 
138  goal_work = TestGoal(4)
139  goal_abort = TestGoal(6)
140  goal_result = TestGoal(8)
141 
142  rospy.logwarn("This is a hacky way to associate goals with feedback")
143 
144  g1 = client.send_goal(goal_work)
145  g2 = client.send_goal(goal_work)
146  g3 = client.send_goal(goal_work)
147  g4 = client.send_goal(goal_work)
148 
149  rospy.sleep(0.5)
150  self.assertEqual(g1.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
151  self.assertEqual(g2.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
152  self.assertEqual(g3.get_goal_status(), GoalStatus.ACTIVE, "Shoule be active")
153  self.assertEqual(g4.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
154 
155  g5 = client.send_goal(goal_result)
156  rospy.sleep(0.5)
157  self.assertEqual(g5.get_goal_status(), GoalStatus.SUCCEEDED, "Should be done")
158 
159  self.assertEqual(g1.get_goal_status(), GoalStatus.SUCCEEDED)
160  self.assertEqual(g1.get_result().result, 4)
161  self.assertEqual(g2.get_goal_status(), GoalStatus.ABORTED)
162  self.assertEqual(g2.get_result().result, 3)
163  self.assertEqual(g3.get_goal_status(), GoalStatus.SUCCEEDED)
164  self.assertEqual(g3.get_result().result, 2)
165  self.assertEqual(g4.get_goal_status(), GoalStatus.ABORTED)
166  self.assertEqual(g4.get_result().result, 1)
167  g6 = client.send_goal(goal_abort)
168  rospy.sleep(0.5)
169 
170 
171 if __name__ == '__main__':
172  import rostest
173  rospy.init_node('test_ref_simple_action_server')
174  rostest.rosrun('actionlib', 'test_simple_action_client_python', TestRefSimpleActionServer)
Full interface to an ActionServer.
Definition: action_client.h:59


actionlib
Author(s): Eitan Marder-Eppstein, Vijay Pradeep, Mikael Arguedas
autogenerated on Mon Feb 28 2022 21:34:38