scriptable_move_base.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2020 Mojin Robotics GmbH
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import math
18 
19 import rospy
20 import tf
21 
22 from scenario_test_tools.scriptable_action_server import ScriptableActionServer
23 from scenario_test_tools.util import countdown_sleep, round_tuple
24 
25 
27  roll, pitch, yaw = tf.transformations.euler_from_quaternion(
28  [mbg.target_pose.pose.orientation.x, mbg.target_pose.pose.orientation.y, mbg.target_pose.pose.orientation.z,
29  mbg.target_pose.pose.orientation.w])
30  return "MoveBase to: (x={x}, y={y}, yaw={yaw})".format(
31  x=mbg.target_pose.pose.position.x,
32  y=mbg.target_pose.pose.position.y,
33  yaw=yaw)
34 
35 
37  def __init__(self, name, action_type, goal_formatter=format, result_formatter=format, default_result=None, default_result_delay=0, pub_transform=True):
38  ScriptableActionServer.__init__(self,
39  name=name,
40  action_type=action_type,
41  goal_formatter=goal_formatter,
42  result_formatter=result_formatter,
43  default_result=default_result,
44  default_result_delay=default_result_delay)
45 
47  self.pose_bl = [0, 0, 0]
48  if pub_transform:
49  self._base_link_timer = rospy.Timer(rospy.Duration(0.1), self._pub_transform_bl)
50 
51  def stop(self):
52  self._base_link_timer.shutdown()
53  super(ScriptableMoveBase, self).stop()
54 
55  def _execute_cb(self):
56  """
57  Called when the underlying action server receives a goal.
58  If the default_result is None, it will wait for a custom result to be set via reply* otherwise
59  return the default_result after the given default_result_delay
60 
61  In the reply-case,it then notifies self.reply* (which should be called by a test script outside this class),
62  after which self.reply* determines the result to the goal.
63  Then it notifies _execute_cb that the result has been determined so that _execute_cb can send it
64  """
65  print('\n######## {}.execute ###########'.format(self._name))
66 
67  self._current_goal = self._as.accept_new_goal()
68  self._received_goals.append(self._current_goal)
69 
70  try:
71  goal_str = self.goal_formatter(self._current_goal)
72  except Exception as e:
73  rospy.logerr("goal_formatter of {} raised an exception: {}".format(self._name, e))
74  goal_str = self._current_goal
75  print('{}.execute: From {} to Goal: {}'.format(self._name, self.pose_bl, goal_str))
76 
77  if self.default_reply is not None:
78  countdown_sleep(self._default_reply_delay, text="{}.execute: Wait for {}s. ".format(self._name, self._default_reply_delay) + "Remaining {}s...")
80  self._send_result(self.default_reply)
81  else:
82  self._request.set()
83  # Now, wait for to be called, which sets the _reply event AND the _next_reply
84  self._reply.wait()
85  self._reply.clear()
86 
88  self._send_result(self._next_reply)
89 
90  self._next_reply = None
91  self._current_goal = None
92  self._sent.set()
93 
94  def _send_result(self, result):
95  if result is not self.IGNORE_GOAL and result is not self.ABORT_GOAL:
96  try:
97  result_str = self.result_formatter(result)
98  except Exception as e:
99  result_str = result
100  print("{}.execute: Result: {}".format(self._name, result_str))
101  self.pose_bl = self.goal_to_xy_theta(self.current_goal)
102  self._as.set_succeeded(result)
103  elif result == self.ABORT_GOAL:
104  print("{}.execute: Result: {}".format(self._name, result))
105  self._as.set_aborted()
106  elif result == self.IGNORE_GOAL:
107  print("{}.execute: Result: {}".format(self._name, result))
108  # Do not send a reply at all, to see how the client deals with it
109  pass
110 
111  def _pub_transform_bl(self, event):
112  """
113  Publish the transform from /map to /base_link
114  :param event: timer event
115  """
116  self.br.sendTransform((self.pose_bl[0], self.pose_bl[1], 0),
117  tf.transformations.quaternion_from_euler(0, 0, self.pose_bl[2]),
118  rospy.Time.now(),
119  "/base_link",
120  "/map")
121 
122  def distance(self, pose1, pose2):
123  """
124  Calculate the distance between two x, y, theta arrays
125 
126  :param pose1: Position to calculate distance from
127  :type pose1: [x, y, theta]
128  :param pose2: Position to calculate distance to
129  :type pose2: [x, y, theta]
130  :return: cartesian distance between the positions
131  :rtype float
132  """
133  if (len(pose1) == len(pose2) and len(pose1) == 3):
134  dist = math.sqrt(pow(pose2[0] - pose1[0], 2) + pow(pose2[1] - pose1[1], 2))
135  else:
136  dist = 0.0
137  return dist
138 
139  @staticmethod
140  def goal_to_xy_theta(goal):
141  """
142  Convert a MoveBaseGoal to a much condensed representation: a list of [x, y, theta]
143 
144  :param goal: A moveBaseGoal
145  :return: a compact representation of the goal
146  :rtype List[float]
147  """
148  roll, pitch, yaw = tf.transformations.euler_from_quaternion(
149  [goal.target_pose.pose.orientation.x,
150  goal.target_pose.pose.orientation.y,
151  goal.target_pose.pose.orientation.z,
152  goal.target_pose.pose.orientation.w])
153  return round_tuple((goal.target_pose.pose.position.x,
154  goal.target_pose.pose.position.y,
155  yaw), 1)
156 
def __init__(self, name, action_type, goal_formatter=format, result_formatter=format, default_result=None, default_result_delay=0, pub_transform=True)
def round_tuple(tup, decimals)
Definition: util.py:41
def countdown_sleep(duration, stepsize=1, text="{}")
Definition: util.py:22


scenario_test_tools
Author(s): Loy van Beek
autogenerated on Wed Apr 7 2021 03:03:18