scriptable_action_server.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2020 Mojin Robotics GmbH
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import actionlib
18 import rospy
19 
20 from scenario_test_tools.scriptable_base import ScriptableBase
21 from scenario_test_tools.util import countdown_sleep
22 
23 
25  """
26  ScriptableActionServer allows its users to determine the ActionResult to an ActionGoal.
27 
28  The result-type for e.g. FooAction should be FooResult, not FooActionResult!
29 
30  Goals can be aborted by setting `ABORT_GOAL` as the result.
31 
32  Goals can be ignored by setting `IGNORE_GOAL` as the result. This makes the action client never get a result
33 
34  Note that Actionlib uses the term 'result' for its, well, results, whereas ROS services use 'reply'.
35  The base class `ScriptableBase` uses the 'reply' terminology.
36  `ScriptableActionServer` should be consistent with the action terminology,
37  so it's constructor does some translation between terms and passes 'default_reply=default_result'
38  to the constructor of `ScriptableBase`.
39  """
40 
41  SUCCEED_GOAL = "SUCCEED_GOAL"
42  IGNORE_GOAL = "IGNORE_GOAL"
43  ABORT_GOAL = "ABORT_GOAL"
44 
45  def __init__(self, name, action_type, goal_formatter=format, result_formatter=format, default_result=None, default_result_delay=0):
46  """
47  Set up a ScriptableActionServer based on the name and the type of Action it should implement
48 
49  :param action_type: action type (e.g. MoveBaseAction)
50  """
51  ScriptableBase.__init__(self, name,
52  goal_formatter=goal_formatter,
53  reply_formatter=result_formatter,
54  default_reply=default_result,
55  default_reply_delay=default_result_delay)
56 
57  self._as = actionlib.SimpleActionServer(name, action_type, auto_start=False)
58  self._as.register_goal_callback(self._execute_cb)
59  self._as.register_preempt_callback(self._preempt_cb)
60 
61  def __repr__(self):
62  return "ScriptableActionServer('{}')".format(self._name)
63 
64  @property
65  def connected(self):
66  return self._as.action_server.goal_sub.get_num_connections() >= 1
67 
68  def start(self):
69  """
70  Start the action server and thus listen to goals
71  """
72  self._as.start()
73 
74  def stop(self):
75  self._as.action_server.started = False
76  super(ScriptableActionServer, self).stop()
77 
78  def _execute_cb(self):
79  """
80  Called when the underlying action server receives a goal.
81  If the default_result is None, it will wait for a custom result to be set via reply* otherwise
82  return the default_result after the given default_result_delay
83 
84  In the reply-case,it then notifies self.reply* (which should be called by a test script outside this class),
85  after which self.reply* determines the result to the goal.
86  Then it notifies _execute_cb that the result has been determined so that _execute_cb can send it
87  """
88 
89  self._current_goal = self._as.accept_new_goal()
90  self._received_goals += [self._current_goal]
91  try:
92  goal_str = self.goal_formatter(self._current_goal)
93  except Exception as e:
94  rospy.logerr("goal_formatter of {} raised an exception: {}".format(self._name, e))
95  goal_str = self._current_goal
96  print('{}.execute: Goal: {}'.format(self._name, goal_str))
97 
98  if self.default_reply is not None:
99  countdown_sleep(self._default_reply_delay, text="{}.execute: Wait for {}s. ".format(self._name, self._default_reply_delay) + "Remaining {}s...")
101  self._send_result(self.default_reply)
102  else:
103  self._request.set()
104  # Now, wait for action to be called, which sets the _reply event AND the _next_reply
105  self._reply.wait()
106  self._reply.clear()
107 
109  self._send_result(self._next_reply)
110 
111  self._next_reply = None
112  self._current_goal = None
113  self._sent.set()
114 
115  def _send_result(self, result_tuple):
116  """
117  Send the result and deal with ignored and aborted goals
118 
119  :param result: a Result associated with the Action-type of this Server
120  """
121 
122  if isinstance(result_tuple, tuple):
123  # In this case, result[0] is the actual result and result[1] is an success/abort/ignore action
124  result, action = result_tuple
125  else:
126  # result_tuple is not really a tuple
127  if result_tuple == self.ABORT_GOAL:
128  result, action = None, self.ABORT_GOAL
129  elif result_tuple == self.IGNORE_GOAL:
130  result, action = None, self.IGNORE_GOAL
131  else:
132  result, action = result_tuple, self.SUCCEED_GOAL
133 
134  if action == self.SUCCEED_GOAL:
135  try:
136  result_str = self.result_formatter(result)
137  except Exception as e:
138  rospy.logerr("result_formatter of {} raised an exception: {}".format(self._name, e))
139  result_str = self._current_goal
140  print("{}.execute: Result: {}".format(self._name, result_str))
141  self._as.set_succeeded(result)
142  elif action == self.ABORT_GOAL:
143  print("{}.execute: Result: {}, action: {}".format(self._name, result, action))
144  self._as.set_aborted(result)
145  elif action == self.IGNORE_GOAL:
146  print("{}.execute: Result: {}, action: {}".format(self._name, result, action))
147  # Do not send a reply at all, to see how the client deals with it
148  pass
149 
150  def _preempt_cb(self):
151  """
152  Handle action preemption by the action client
153  """
154  if self._as.is_active():
155  self.preemption = True
156  self._as.set_preempted()
def __init__(self, name, action_type, goal_formatter=format, result_formatter=format, default_result=None, default_result_delay=0)
def countdown_sleep(duration, stepsize=1, text="{}")
Definition: util.py:22


scenario_test_tools
Author(s): Loy van Beek
autogenerated on Wed Apr 7 2021 03:03:18