00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 PKG = 'actionlib'
00030
00031 import unittest
00032 import rospy
00033 from actionlib_msgs.msg import GoalStatus
00034
00035
00036 from actionlib import ActionClient
00037 from actionlib.msg import TestAction, TestGoal
00038
00039
00040 class TestRefSimpleActionServer(unittest.TestCase):
00041
00042 def testsimple(self):
00043 return
00044 client = ActionClient('reference_action', TestAction)
00045 self.assert_(client.wait_for_action_server_to_start(rospy.Duration(2.0)),
00046 'Could not connect to the action server')
00047
00048 goal = TestGoal(1)
00049 client.send_goal(goal)
00050 self.assert_(client.wait_for_goal_to_finish(rospy.Duration(2.0)),
00051 "Goal didn't finish")
00052 self.assertEqual(GoalStatus.SUCCEEDED, client.get_terminal_state())
00053 self.assertEqual(GoalStatus.SUCCEEDED, client.get_state())
00054
00055 goal = TestGoal(2)
00056 client.send_goal(goal)
00057 self.assert_(client.wait_for_goal_to_finish(rospy.Duration(10.0)),
00058 "Goal didn't finish")
00059 self.assertEqual(GoalStatus.ABORTED, client.get_terminal_state())
00060 self.assertEqual(GoalStatus.ABORTED, client.get_state())
00061
00062 def test_abort(self):
00063 client = ActionClient('reference_action', TestAction)
00064 self.assert_(client.wait_for_server(rospy.Duration(2.0)),
00065 'Could not connect to the action server')
00066
00067 goal_work = TestGoal(4)
00068 goal_abort = TestGoal(6)
00069 goal_feedback = TestGoal(8)
00070
00071 g1 = client.send_goal(goal_work)
00072 g2 = client.send_goal(goal_work)
00073 g3 = client.send_goal(goal_work)
00074 g4 = client.send_goal(goal_work)
00075
00076 rospy.sleep(0.5)
00077 self.assertEqual(g1.get_goal_status(), GoalStatus.ACTIVE)
00078 self.assertEqual(g2.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
00079 self.assertEqual(g3.get_goal_status(), GoalStatus.ACTIVE, "Shoule be active")
00080 self.assertEqual(g4.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
00081
00082 g5 = client.send_goal(goal_abort)
00083 rospy.sleep(0.5)
00084 self.assertEqual(g5.get_goal_status(), GoalStatus.SUCCEEDED, "Should be done")
00085
00086 self.assertEqual(g1.get_goal_status(), GoalStatus.ABORTED, "Should be aborted")
00087 self.assertEqual(g2.get_goal_status(), GoalStatus.ABORTED, "Should be aborted")
00088 self.assertEqual(g3.get_goal_status(), GoalStatus.ABORTED, "Shoule be aborted")
00089 self.assertEqual(g4.get_goal_status(), GoalStatus.ABORTED, "Should be aborted")
00090
00091 def test_feedback(self):
00092 client = ActionClient('reference_action', TestAction)
00093 self.assert_(client.wait_for_server(rospy.Duration(2.0)),
00094 'Could not connect to the action server')
00095
00096 goal_work = TestGoal(4)
00097 goal_abort = TestGoal(6)
00098 goal_feedback = TestGoal(7)
00099
00100 rospy.logwarn("This is a hacky way to associate goals with feedback")
00101 feedback = {}
00102
00103 def update_feedback(id, g, f):
00104 feedback[id] = f
00105
00106 g1 = client.send_goal(goal_work, feedback_cb=lambda g, f: update_feedback(0, g, f))
00107 g2 = client.send_goal(goal_work, feedback_cb=lambda g, f: update_feedback(1, g, f))
00108 g3 = client.send_goal(goal_work, feedback_cb=lambda g, f: update_feedback(2, g, f))
00109 g4 = client.send_goal(goal_work, feedback_cb=lambda g, f: update_feedback(3, g, f))
00110
00111 rospy.sleep(0.5)
00112 self.assertEqual(g1.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
00113 self.assertEqual(g2.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
00114 self.assertEqual(g3.get_goal_status(), GoalStatus.ACTIVE, "Shoule be active")
00115 self.assertEqual(g4.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
00116
00117 g5 = client.send_goal(goal_feedback)
00118 rospy.sleep(0.5)
00119 self.assertEqual(g5.get_goal_status(), GoalStatus.SUCCEEDED, "Should be done")
00120
00121 self.assertEqual(g1.get_goal_status(), GoalStatus.ACTIVE)
00122 self.assertEqual(feedback[0].feedback, 4)
00123 self.assertEqual(g2.get_goal_status(), GoalStatus.ACTIVE)
00124 self.assertEqual(feedback[1].feedback, 3)
00125 self.assertEqual(g3.get_goal_status(), GoalStatus.ACTIVE)
00126 self.assertEqual(feedback[2].feedback, 2)
00127 self.assertEqual(g4.get_goal_status(), GoalStatus.ACTIVE)
00128 self.assertEqual(feedback[3].feedback, 1)
00129
00130 g6 = client.send_goal(goal_abort)
00131 rospy.sleep(0.5)
00132
00133 def test_result(self):
00134 client = ActionClient('reference_action', TestAction)
00135 self.assert_(client.wait_for_server(rospy.Duration(2.0)),
00136 'Could not connect to the action server')
00137
00138 goal_work = TestGoal(4)
00139 goal_abort = TestGoal(6)
00140 goal_result = TestGoal(8)
00141
00142 rospy.logwarn("This is a hacky way to associate goals with feedback")
00143
00144 g1 = client.send_goal(goal_work)
00145 g2 = client.send_goal(goal_work)
00146 g3 = client.send_goal(goal_work)
00147 g4 = client.send_goal(goal_work)
00148
00149 rospy.sleep(0.5)
00150 self.assertEqual(g1.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
00151 self.assertEqual(g2.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
00152 self.assertEqual(g3.get_goal_status(), GoalStatus.ACTIVE, "Shoule be active")
00153 self.assertEqual(g4.get_goal_status(), GoalStatus.ACTIVE, "Should be active")
00154
00155 g5 = client.send_goal(goal_result)
00156 rospy.sleep(0.5)
00157 self.assertEqual(g5.get_goal_status(), GoalStatus.SUCCEEDED, "Should be done")
00158
00159 self.assertEqual(g1.get_goal_status(), GoalStatus.SUCCEEDED)
00160 self.assertEqual(g1.get_result().result, 4)
00161 self.assertEqual(g2.get_goal_status(), GoalStatus.ABORTED)
00162 self.assertEqual(g2.get_result().result, 3)
00163 self.assertEqual(g3.get_goal_status(), GoalStatus.SUCCEEDED)
00164 self.assertEqual(g3.get_result().result, 2)
00165 self.assertEqual(g4.get_goal_status(), GoalStatus.ABORTED)
00166 self.assertEqual(g4.get_result().result, 1)
00167 g6 = client.send_goal(goal_abort)
00168 rospy.sleep(0.5)
00169
00170
00171 if __name__ == '__main__':
00172 import rostest
00173 rospy.init_node('test_ref_simple_action_server')
00174 rostest.rosrun('actionlib', 'test_simple_action_client_python', TestRefSimpleActionServer)