00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 PKG='actionlib'
00030 import roslib; roslib.load_manifest(PKG)
00031
00032 import sys
00033 import unittest
00034 import rospy
00035 from actionlib_msgs.msg import *
00036 from actionlib import SimpleActionClient
00037 from actionlib import ActionClient
00038 from actionlib.msg import TestAction, TestGoal
00039
00040 class TestRefSimpleActionServer(unittest.TestCase):
00041
00042 def testsimple(self):
00043 return
00044 client = ActionClient('reference_action', TestAction)
00045 self.assert_(client.wait_for_action_server_to_start(rospy.Duration(2.0)),
00046 'Could not connect to the action server')
00047
00048 goal = TestGoal(1)
00049 client.send_goal(goal)
00050 self.assert_(client.wait_for_goal_to_finish(rospy.Duration(2.0)),
00051 "Goal didn't finish")
00052 self.assertEqual(GoalStatus.SUCCEEDED, client.get_terminal_state())
00053 self.assertEqual(GoalStatus.SUCCEEDED, client.get_state())
00054
00055 goal = TestGoal(2)
00056 client.send_goal(goal)
00057 self.assert_(client.wait_for_goal_to_finish(rospy.Duration(10.0)),
00058 "Goal didn't finish")
00059 self.assertEqual(GoalStatus.ABORTED, client.get_terminal_state())
00060 self.assertEqual(GoalStatus.ABORTED, client.get_state())
00061
00062
00063 def test_abort(self):
00064 client = ActionClient('reference_action', TestAction)
00065 self.assert_(client.wait_for_server(rospy.Duration(2.0)),
00066 'Could not connect to the action server')
00067
00068 goal_work = TestGoal(4)
00069 goal_abort = TestGoal(6)
00070 goal_feedback = TestGoal(8)
00071
00072 g1=client.send_goal(goal_work)
00073 g2=client.send_goal(goal_work)
00074 g3=client.send_goal(goal_work)
00075 g4=client.send_goal(goal_work)
00076
00077 rospy.sleep(0.5);
00078 self.assertEqual(g1.get_goal_status(),GoalStatus.ACTIVE)
00079 self.assertEqual(g2.get_goal_status(),GoalStatus.ACTIVE,"Should be active")
00080 self.assertEqual(g3.get_goal_status(),GoalStatus.ACTIVE,"Shoule be active")
00081 self.assertEqual(g4.get_goal_status(),GoalStatus.ACTIVE,"Should be active")
00082
00083 g5=client.send_goal(goal_abort)
00084 rospy.sleep(0.5);
00085 self.assertEqual(g5.get_goal_status(),GoalStatus.SUCCEEDED,"Should be done")
00086
00087 self.assertEqual(g1.get_goal_status(),GoalStatus.ABORTED,"Should be aborted")
00088 self.assertEqual(g2.get_goal_status(),GoalStatus.ABORTED,"Should be aborted")
00089 self.assertEqual(g3.get_goal_status(),GoalStatus.ABORTED,"Shoule be aborted")
00090 self.assertEqual(g4.get_goal_status(),GoalStatus.ABORTED,"Should be aborted")
00091
00092
00093
00094 def test_feedback(self):
00095 client = ActionClient('reference_action', TestAction)
00096 self.assert_(client.wait_for_server(rospy.Duration(2.0)),
00097 'Could not connect to the action server')
00098
00099 goal_work = TestGoal(4)
00100 goal_abort = TestGoal(6)
00101 goal_feedback = TestGoal(7)
00102
00103 rospy.logwarn("This is a hacky way to associate goals with feedback");
00104 feedback={};
00105 def update_feedback(id,g,f):
00106 feedback[id]=f;
00107
00108 g1=client.send_goal(goal_work,feedback_cb=lambda g,f:update_feedback(0,g,f))
00109 g2=client.send_goal(goal_work,feedback_cb=lambda g,f:update_feedback(1,g,f))
00110 g3=client.send_goal(goal_work,feedback_cb=lambda g,f:update_feedback(2,g,f))
00111 g4=client.send_goal(goal_work,feedback_cb=lambda g,f:update_feedback(3,g,f))
00112
00113 rospy.sleep(0.5);
00114 self.assertEqual(g1.get_goal_status(),GoalStatus.ACTIVE,"Should be active")
00115 self.assertEqual(g2.get_goal_status(),GoalStatus.ACTIVE,"Should be active")
00116 self.assertEqual(g3.get_goal_status(),GoalStatus.ACTIVE,"Shoule be active")
00117 self.assertEqual(g4.get_goal_status(),GoalStatus.ACTIVE,"Should be active")
00118
00119 g5=client.send_goal(goal_feedback)
00120 rospy.sleep(0.5);
00121 self.assertEqual(g5.get_goal_status(),GoalStatus.SUCCEEDED,"Should be done")
00122
00123
00124 self.assertEqual(g1.get_goal_status(),GoalStatus.ACTIVE)
00125 self.assertEqual(feedback[0].feedback,4)
00126 self.assertEqual(g2.get_goal_status(),GoalStatus.ACTIVE)
00127 self.assertEqual(feedback[1].feedback,3)
00128 self.assertEqual(g3.get_goal_status(),GoalStatus.ACTIVE)
00129 self.assertEqual(feedback[2].feedback,2)
00130 self.assertEqual(g4.get_goal_status(),GoalStatus.ACTIVE)
00131 self.assertEqual(feedback[3].feedback,1)
00132
00133 g6=client.send_goal(goal_abort)
00134 rospy.sleep(0.5);
00135
00136
00137 def test_result(self):
00138 client = ActionClient('reference_action', TestAction)
00139 self.assert_(client.wait_for_server(rospy.Duration(2.0)),
00140 'Could not connect to the action server')
00141
00142 goal_work = TestGoal(4)
00143 goal_abort = TestGoal(6)
00144 goal_result = TestGoal(8)
00145
00146 rospy.logwarn("This is a hacky way to associate goals with feedback");
00147
00148 g1=client.send_goal(goal_work)
00149 g2=client.send_goal(goal_work)
00150 g3=client.send_goal(goal_work)
00151 g4=client.send_goal(goal_work)
00152
00153 rospy.sleep(0.5);
00154 self.assertEqual(g1.get_goal_status(),GoalStatus.ACTIVE,"Should be active")
00155 self.assertEqual(g2.get_goal_status(),GoalStatus.ACTIVE,"Should be active")
00156 self.assertEqual(g3.get_goal_status(),GoalStatus.ACTIVE,"Shoule be active")
00157 self.assertEqual(g4.get_goal_status(),GoalStatus.ACTIVE,"Should be active")
00158
00159 g5=client.send_goal(goal_result)
00160 rospy.sleep(0.5);
00161 self.assertEqual(g5.get_goal_status(),GoalStatus.SUCCEEDED,"Should be done")
00162
00163 self.assertEqual(g1.get_goal_status(),GoalStatus.SUCCEEDED)
00164 self.assertEqual(g1.get_result().result,4)
00165 self.assertEqual(g2.get_goal_status(),GoalStatus.ABORTED)
00166 self.assertEqual(g2.get_result().result,3)
00167 self.assertEqual(g3.get_goal_status(),GoalStatus.SUCCEEDED)
00168 self.assertEqual(g3.get_result().result,2)
00169 self.assertEqual(g4.get_goal_status(),GoalStatus.ABORTED)
00170 self.assertEqual(g4.get_result().result,1)
00171 g6=client.send_goal(goal_abort)
00172 rospy.sleep(0.5);
00173
00174
00175
00176
00177
00178 if __name__ == '__main__':
00179 import rostest
00180 rospy.init_node('test_ref_simple_action_server')
00181 rostest.rosrun('actionlib', 'test_simple_action_client_python', TestRefSimpleActionServer)