22 from std_msgs.msg
import String
23 from cob_sound.msg
import SayAction, SayResult
24 from move_base_msgs.msg
import MoveBaseAction, MoveBaseResult, MoveBaseGoal
25 from control_msgs.msg
import FollowJointTrajectoryAction, FollowJointTrajectoryResult, FollowJointTrajectoryGoal
26 from cob_srvs.srv
import SetString, SetStringRequest, SetStringResponse
49 YELLOW_BG =
'\033[43m' 51 MAGENTA_BG =
'\033[45m' 60 roll, pitch, yaw = tf.transformations.euler_from_quaternion(
61 [mbg.target_pose.pose.orientation.x, mbg.target_pose.pose.orientation.y, mbg.target_pose.pose.orientation.z,
62 mbg.target_pose.pose.orientation.w])
63 return bcolors.MAGENTA + bcolors.UNDERLINE +
"MoveBase to: (x={x}, y={y}, yaw={yaw})".format(
64 x=mbg.target_pose.pose.position.x,
65 y=mbg.target_pose.pose.position.y,
66 yaw=yaw) + bcolors.ENDC
78 default_result=SayResult(success=
True),
79 goal_formatter=
lambda goal: bcolors.YELLOW + bcolors.UNDERLINE +
80 "text: '{}'".format(goal.text) +
82 result_formatter=
lambda result: bcolors.YELLOW +
83 "success: '{}'".format(result.success) +
87 default_result_delay=5,
88 goal_formatter=format_move_base_goal,
89 result_formatter=
lambda result: bcolors.MAGENTA +
95 FollowJointTrajectoryAction,
96 default_result_delay=1,
97 goal_formatter=
lambda goal: bcolors.CYAN + bcolors.UNDERLINE +
98 "Moving to {}".format(
99 goal.trajectory.points[-1].positions) +
101 result_formatter=
lambda result: bcolors.CYAN +
106 default_response_delay=5,
107 request_formatter=
lambda 108 req: bcolors.YELLOW_BG + bcolors.BLACK + bcolors.UNDERLINE +
109 "Docking: {}".format(req.data) +
111 response_formatter=
lambda res: bcolors.YELLOW_BG + bcolors.BLACK +
113 "Succeeded " if res.success
else "Failed") +
117 self.move_base.start()
127 return [member
for member
in dir(self)
if member.startswith(
'test_')]
130 res = SetStringResponse()
136 return False,
"Not a valid test scenario" 138 func = getattr(self, scenario)
140 print(
"Starting scenario: {}".format(scenario))
142 return True,
"Scenario '{}' passed successfully".format(scenario)
143 except Exception
as e:
144 print(bcolors.RED +
"----- error: ------")
146 traceback.format_exc()
152 print(
"Stopping all test helpers")
153 self.move_base.stop()
160 The robot is told to go charge. 161 It should then navigate to a position close to the charger, 162 call the dock-service then uses it's arm to plug itself in and say something when 163 finally getting some juice from the charger. 165 self.human_to_robot_speech.publish(
'Go charge')
167 nav_goal = (10, 10, 0)
169 move_base_goal = self.move_base.await_goal(marker=
'start_move_to_charger')
170 navigated_to_goal = any(self.move_base.match_in_received_goals([nav_goal], key=self.move_base.goal_to_xy_theta))
171 already_at_goal = self.move_base.pose_bl == nav_goal
173 assert navigated_to_goal
or already_at_goal,
"Robot is not at goal for service" 175 self.move_base.direct_reply(MoveBaseResult(), marker=
'end_move_to_charger')
177 self.dock.reply(SetStringResponse(success=
True), marker=
'dock_into_charger')
178 self.arm.reply(FollowJointTrajectoryResult(error_code=FollowJointTrajectoryResult.SUCCESSFUL), marker=
'arm_plug')
180 while not self.say.match_in_received_goals([
"Ah, some juice!",
"Jummy, fresh juice!"],
181 key=
lambda req: req.text)
and not rospy.is_shutdown():
184 rospy.loginfo(
"test_happy_flow succeeded")
188 The robot is told to go charge. 189 It should then navigate to a position close to the charger. 190 call the dock-service then uses it's arm to plug itself in and say something when 191 finally getting some juice from the charger. 193 However, the docking fails first so the robot retries and then the arm fails at first, 194 so the robot says something about it's arm and then retries 196 self.human_to_robot_speech.publish(
'Go charge')
198 nav_goal = (10, 10, 0)
200 move_base_goal = self.move_base.await_goal()
201 navigated_to_goal = any(self.move_base.match_in_received_goals([nav_goal], key=self.move_base.goal_to_xy_theta))
202 already_at_goal = self.move_base.pose_bl == nav_goal
204 assert navigated_to_goal
or already_at_goal,
"Robot is not at goal for service" 206 self.move_base.direct_reply(MoveBaseResult())
209 with self.dock.custom_reply():
210 self.dock.reply(SetStringResponse(success=
False))
213 say_goal = self.say.await_goal()
214 assert 'fail' in say_goal.text
215 self.say.direct_reply(SayResult(success=
True))
218 self.dock.reply(SetStringResponse(success=
True))
220 with self.arm.custom_reply():
221 self.arm.reply(FollowJointTrajectoryResult(error_code=FollowJointTrajectoryResult.PATH_TOLERANCE_VIOLATED))
224 say_goal = self.say.await_goal()
225 assert 'fail' in say_goal.text
226 self.say.direct_reply(SayResult(success=
True))
228 self.arm.reply(FollowJointTrajectoryResult(error_code=FollowJointTrajectoryResult.SUCCESSFUL))
230 while not self.say.match_in_received_goals([
"Ah, some juice!",
"Jummy, fresh juice!"],
231 key=
lambda req: req.text)
and not rospy.is_shutdown():
234 rospy.loginfo(
"test_unhappy_flow succeeded")
237 if not rospy.is_shutdown():
238 with self.say.remember_goals():
241 if not rospy.is_shutdown():
242 with self.say.remember_goals():
246 if __name__ ==
"__main__":
247 rospy.init_node(
"example_test")
251 rospy.on_shutdown(ts.stop)
256 rospy.myargv(argv=sys.argv)
257 parser = argparse.ArgumentParser()
258 parser.add_argument(
"--start_scenario", help=
"Which test scenario should be started?")
259 parser.add_argument(
"--keep_alive", action=
'store_true', help=
"Let servers run after test finished?")
260 parser.add_argument(
"--debug", action=
'store_true', help=
"Run pdb.post_mortem when a test hits an exception")
261 args = parser.parse_args()
264 ts.debug_exceptions =
True 266 if args.start_scenario:
267 start_scenario = args.start_scenario
268 success, message = ts.start_scenario(args.start_scenario)
270 if not args.keep_alive:
271 rospy.signal_shutdown(
"End of test")
277 "Test has finished but node keeps running because --keep_alive was passed")
def start_scenario(self, scenario)
def test_happy_flow(self)
def test_unhappy_flow(self)
def _available_test_methods(self)
def format_move_base_goal(mbg)
def __init__(self, debug_exceptions=False)
def _start_scenario(self, request)