example_test.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2020 Mojin Robotics GmbH
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import rospy
18 import traceback
19 import pdb
20 
21 import tf
22 from std_msgs.msg import String
23 from cob_sound.msg import SayAction, SayResult
24 from move_base_msgs.msg import MoveBaseAction, MoveBaseResult, MoveBaseGoal
25 from control_msgs.msg import FollowJointTrajectoryAction, FollowJointTrajectoryResult, FollowJointTrajectoryGoal
26 from cob_srvs.srv import SetString, SetStringRequest, SetStringResponse
27 
28 from scenario_test_tools.scriptable_move_base import ScriptableMoveBase
29 from scenario_test_tools.scriptable_action_server import ScriptableActionServer
30 from scenario_test_tools.scriptable_service_server import ScriptableServiceServer
31 from scenario_test_tools.util import countdown_sleep, round_tuple
32 
33 
34 class bcolors:
35  HEADER = '\033[95m'
36 
37  BLACK = '\033[30m'
38  RED = '\033[31m'
39  GREEN = '\033[32m'
40  YELLOW = '\033[33m'
41  BLUE = '\033[34m'
42  MAGENTA = '\033[35m'
43  CYAN = '\033[36m'
44  WHITE = '\033[37m'
45 
46  BLACK_BG = '\033[40m'
47  RED_BG = '\033[41m'
48  GREEN_BG = '\033[42m'
49  YELLOW_BG = '\033[43m'
50  BLUE_BG = '\033[44m'
51  MAGENTA_BG = '\033[45m'
52  CYAN_BG = '\033[46m'
53  WHITE_BG = '\033[47m'
54 
55  ENDC = '\033[0m'
56  UNDERLINE = '\033[4m'
57 
58 
60  roll, pitch, yaw = tf.transformations.euler_from_quaternion(
61  [mbg.target_pose.pose.orientation.x, mbg.target_pose.pose.orientation.y, mbg.target_pose.pose.orientation.z,
62  mbg.target_pose.pose.orientation.w])
63  return bcolors.MAGENTA + bcolors.UNDERLINE + "MoveBase to: (x={x}, y={y}, yaw={yaw})".format(
64  x=mbg.target_pose.pose.position.x,
65  y=mbg.target_pose.pose.position.y,
66  yaw=yaw) + bcolors.ENDC
67 
68 
69 class TestScenario(object):
70  def __init__(self, debug_exceptions=False):
71  self.debug_exceptions = debug_exceptions
72 
73  self.human_to_robot_speech = rospy.Publisher('/command', String, queue_size=1)
74 
75  self.say = ScriptableActionServer('say', SayAction,
76  # The default reply allows to not bother with every wishy-washy things
77  # the robot needs to say, eg. in cases where success is expected (like TTS)
78  default_result=SayResult(success=True),
79  goal_formatter=lambda goal: bcolors.YELLOW + bcolors.UNDERLINE +
80  "text: '{}'".format(goal.text) +
81  bcolors.ENDC,
82  result_formatter=lambda result: bcolors.YELLOW +
83  "success: '{}'".format(result.success) +
84  bcolors.ENDC)
85 
86  self.move_base = ScriptableMoveBase('/move_base', MoveBaseAction,
87  default_result_delay=5,
88  goal_formatter=format_move_base_goal,
89  result_formatter=lambda result: bcolors.MAGENTA +
90  "MoveBase: done" +
91  bcolors.ENDC
92  )
93 
94  self.arm = ScriptableActionServer('/arm/joint_trajectory_controller/follow_joint_trajectory',
95  FollowJointTrajectoryAction,
96  default_result_delay=1,
97  goal_formatter=lambda goal: bcolors.CYAN + bcolors.UNDERLINE +
98  "Moving to {}".format(
99  goal.trajectory.points[-1].positions) +
100  bcolors.ENDC,
101  result_formatter=lambda result: bcolors.CYAN +
102  "Arm: done" +
103  bcolors.ENDC)
104 
105  self.dock = ScriptableServiceServer('/dock', SetString,
106  default_response_delay=5,
107  request_formatter=lambda
108  req: bcolors.YELLOW_BG + bcolors.BLACK + bcolors.UNDERLINE +
109  "Docking: {}".format(req.data) +
110  bcolors.ENDC,
111  response_formatter=lambda res: bcolors.YELLOW_BG + bcolors.BLACK +
112  "Dock: {}".format(
113  "Succeeded " if res.success else "Failed") +
114  bcolors.ENDC)
115 
116  self.say.start()
117  self.move_base.start()
118  self.arm.start()
119 
120  self._start_scenario_srv = rospy.Service('start_scenario', SetString, self._start_scenario)
121 
122  print("The following tests are available: " + ', '.join(self._available_test_methods))
123  rospy.sleep(1)
124 
125  @property
127  return [member for member in dir(self) if member.startswith('test_')]
128 
129  def _start_scenario(self, request):
130  res = SetStringResponse()
131  res.success, res.message = self.start_scenario(request.data)
132  return res
133 
134  def start_scenario(self, scenario):
135  if scenario not in self._available_test_methods:
136  return False, "Not a valid test scenario"
137  else:
138  func = getattr(self, scenario)
139  try:
140  print("Starting scenario: {}".format(scenario))
141  func()
142  return True, "Scenario '{}' passed successfully".format(scenario)
143  except Exception as e:
144  print(bcolors.RED + "----- error: ------")
145  print(e)
146  traceback.format_exc()
147  if self.debug_exceptions:
148  pdb.post_mortem()
149  return False, str(e)
150 
151  def stop(self):
152  print("Stopping all test helpers")
153  self.move_base.stop()
154  self.arm.stop()
155  self.say.stop()
156  self.dock.stop()
157 
158  def test_happy_flow(self):
159  """
160  The robot is told to go charge.
161  It should then navigate to a position close to the charger,
162  call the dock-service then uses it's arm to plug itself in and say something when
163  finally getting some juice from the charger.
164  """
165  self.human_to_robot_speech.publish('Go charge')
166 
167  nav_goal = (10, 10, 0) # Just a random goal in x, y and orientation
168 
169  move_base_goal = self.move_base.await_goal(marker='start_move_to_charger')
170  navigated_to_goal = any(self.move_base.match_in_received_goals([nav_goal], key=self.move_base.goal_to_xy_theta))
171  already_at_goal = self.move_base.pose_bl == nav_goal
172 
173  assert navigated_to_goal or already_at_goal, "Robot is not at goal for service"
174 
175  self.move_base.direct_reply(MoveBaseResult(), marker='end_move_to_charger')
176 
177  self.dock.reply(SetStringResponse(success=True), marker='dock_into_charger')
178  self.arm.reply(FollowJointTrajectoryResult(error_code=FollowJointTrajectoryResult.SUCCESSFUL), marker='arm_plug')
179 
180  while not self.say.match_in_received_goals(["Ah, some juice!", "Jummy, fresh juice!"],
181  key=lambda req: req.text) and not rospy.is_shutdown():
182  rospy.sleep(0.5)
183 
184  rospy.loginfo("test_happy_flow succeeded")
185 
186  def test_unhappy_flow(self):
187  """
188  The robot is told to go charge.
189  It should then navigate to a position close to the charger.
190  call the dock-service then uses it's arm to plug itself in and say something when
191  finally getting some juice from the charger.
192 
193  However, the docking fails first so the robot retries and then the arm fails at first,
194  so the robot says something about it's arm and then retries
195  """
196  self.human_to_robot_speech.publish('Go charge')
197 
198  nav_goal = (10, 10, 0) # Just a random goal in x, y and orientation
199 
200  move_base_goal = self.move_base.await_goal()
201  navigated_to_goal = any(self.move_base.match_in_received_goals([nav_goal], key=self.move_base.goal_to_xy_theta))
202  already_at_goal = self.move_base.pose_bl == nav_goal
203 
204  assert navigated_to_goal or already_at_goal, "Robot is not at goal for service"
205 
206  self.move_base.direct_reply(MoveBaseResult())
207 
208  # Using a custom reply, we can override the default reply set on the dock service temporarily
209  with self.dock.custom_reply():
210  self.dock.reply(SetStringResponse(success=False))
211 
212  # the robot notices docking has failed, so it should say something about that
213  say_goal = self.say.await_goal()
214  assert 'fail' in say_goal.text
215  self.say.direct_reply(SayResult(success=True))
216 
217  # Then retry docking, now successfully
218  self.dock.reply(SetStringResponse(success=True))
219 
220  with self.arm.custom_reply():
221  self.arm.reply(FollowJointTrajectoryResult(error_code=FollowJointTrajectoryResult.PATH_TOLERANCE_VIOLATED))
222 
223  # the robot notices docking has failed, so it should say something about that
224  say_goal = self.say.await_goal()
225  assert 'fail' in say_goal.text
226  self.say.direct_reply(SayResult(success=True))
227 
228  self.arm.reply(FollowJointTrajectoryResult(error_code=FollowJointTrajectoryResult.SUCCESSFUL))
229 
230  while not self.say.match_in_received_goals(["Ah, some juice!", "Jummy, fresh juice!"],
231  key=lambda req: req.text) and not rospy.is_shutdown():
232  rospy.sleep(0.5)
233 
234  rospy.loginfo("test_unhappy_flow succeeded")
235 
236  def test_all(self):
237  if not rospy.is_shutdown():
238  with self.say.remember_goals():
239  self.test_happy_flow()
240 
241  if not rospy.is_shutdown():
242  with self.say.remember_goals():
243  self.test_unhappy_flow()
244 
245 
246 if __name__ == "__main__":
247  rospy.init_node("example_test")
248 
250 
251  rospy.on_shutdown(ts.stop)
252 
253  import sys
254  import argparse
255 
256  rospy.myargv(argv=sys.argv) # Remove ROS remappings
257  parser = argparse.ArgumentParser()
258  parser.add_argument("--start_scenario", help="Which test scenario should be started?")
259  parser.add_argument("--keep_alive", action='store_true', help="Let servers run after test finished?")
260  parser.add_argument("--debug", action='store_true', help="Run pdb.post_mortem when a test hits an exception")
261  args = parser.parse_args()
262 
263  if args.debug:
264  ts.debug_exceptions = True
265 
266  if args.start_scenario:
267  start_scenario = args.start_scenario
268  success, message = ts.start_scenario(args.start_scenario)
269 
270  if not args.keep_alive:
271  rospy.signal_shutdown("End of test")
272 
273  ts.stop()
274  exit()
275  else:
276  rospy.logwarn(
277  "Test has finished but node keeps running because --keep_alive was passed")
278  rospy.spin()
def start_scenario(self, scenario)
def format_move_base_goal(mbg)
Definition: example_test.py:59
def __init__(self, debug_exceptions=False)
Definition: example_test.py:70
def _start_scenario(self, request)


scenario_test_tools
Author(s): Loy van Beek
autogenerated on Wed Apr 7 2021 03:03:18