scriptable_base.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2020 Mojin Robotics GmbH
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 from threading import Event
18 import contextlib
19 
20 from scenario_test_tools.util import countdown_sleep
21 import collections
22 
23 
24 class ScriptableBase(object):
25  """
26  ScriptableBase is a superclass that has common functionality for Scriptable... callbacks.
27 
28  It allows to set a default reply that can be overridden if need be, so custom replies can be defined
29  """
30 
31  def __init__(self, name, goal_formatter=format, reply_formatter=format, default_reply=None, default_reply_delay=0):
32  """
33  Set up a ScriptableBase. Must be subclassed.
34 
35  :param name: the action namespace the server should operate on (e.g. `move_base`)
36  :param goal_formatter: a function accepting a goal and returning a nice-looking summary str of it
37  :param reply_formatter: a function accepting a result and returning a nice-looking summary str of it
38  :param default_reply: optional. If set, this will be returned after default_result_delay,
39  otherwise a .reply* call is needed to send a reply. Overriding the default reply and
40  doing something custom is possible with the `custom_reply`-context manager
41  :param default_reply_delay: If default_result is defined, the Scriptable... waits for this delay before returning.
42  This delay is also used when no more specific reply_delay is specified in a reply_*-call
43  """
44  self._name = name
45 
46  self._current_goal = None
47 
48  # What to reply in the next goal?
49  self._next_reply = None
50 
51  # Set when _as receives a goal (a question)
52  self._request = Event()
53 
54  # Set when self.reply determines the result (the answer)
55  self._reply = Event()
56 
57  # Set when _as has actually sent the result.
58  # Only so printed output appears in order
59  self._sent = Event()
60 
61  self._waiting_for = None
62 
63  self.goal_formatter = goal_formatter
64  self.result_formatter = reply_formatter
65 
66  self._default_reply = default_reply
67  self._default_reply_delay = default_reply_delay
68 
69  self._received_goals = []
70 
72 
73  @property
74  def name(self):
75  return self._name
76 
77  def stop(self):
78  """
79  If the process is blocked by waiting for some Events to be set, stop sets those Events.
80  """
81  self._sent.set()
82  self._reply.set()
83 
84  def reply(self, result, timeout=None, reply_delay=None, marker=None):
85  """
86  Reply to the next goal with the given result, after `reply_delay` amount of seconds.
87  An AssertionError is raised when a goal is not received within the given timeout.
88 
89  :param result: an ...ActionResult of the type associated with the Action-type of this server
90  :param timeout: how long to wait for the goal? Defaults to None to wait indefinitely
91  :param reply_delay: how to to reply_delay/calculate on this goal before sending the reply
92  :param marker: A str that is printed in the output for easy reference between different replies
93  :return: None
94  """
95  if reply_delay is None:
96  reply_delay = self.default_reply_delay
97 
98  print('\n######## {}.reply{} ###########'.format(self._name, '({})'.format(marker) if marker else ''))
99 
100  assert self._waiting_for is None, "reply{} cannot follow an 'await_goal', use reply_directly".format('({})'.format(marker) if marker else '')
101  self.default_reply = None
102 
103  print("{}.reply{}: Waiting {}for goal...".format(self._name, '({})'.format(marker) if marker else '',
104  str(timeout)+'s ' if timeout is not None else ''))
105 
106  assert self._request.wait(timeout), \
107  "{}.reply{} did not get a goal in time".format(self._name, '({})'.format(marker) if marker else '')
108 
109  self._request.clear()
110  print("{}.reply{}: Got goal: {}"
111  .format(self._name, '({})'.format(marker) if marker else '', self.goal_formatter(self._current_goal)))
112 
113  self._next_reply = result
114 
115  # The second {} will be filled by countdown_sleep
116  countdown_sleep(reply_delay, text="{}.reply{}: Think for {}s. ".format(self._name, '({})'.format(marker) if marker else '', reply_delay) + "Remaining {}s...")
117 
118  self._reply.set()
119  self._sent.wait()
120  self._sent.clear()
121 
122  print("{}.reply{}: Finished replying"
123  .format(self._name, '({})'.format(marker) if marker else ''))
124 
125  def reply_conditionally(self, condition, true_result, false_result, timeout=None, reply_delay=None, marker=None):
126  """
127  Reply one of two possibilities, based on a condition. This is a callable that, given a Goal, returns a bool
128  If True, then reply with the true_reply and vice versa.
129  An AssertionError is raised when a goal is not received within the given timeout.
130 
131  :param condition: callable(...Goal) -> bool
132  :param true_result: a ...Result
133  :param false_result: a ...Result
134  :param timeout: seconds to wait for the goal. Defaults to None to wait indefinitely
135  :param reply_delay: Delay the reply by this amount of seconds
136  :param marker: A str that is printed in the output for easy reference between different replies
137  :return: bool
138  """
139  if reply_delay is None:
140  reply_delay = self.default_reply_delay
141 
142  print('\n######## {}.reply_conditionally{} ###########'
143  .format(self._name, '({})'.format(marker) if marker else ''))
144 
145  assert self._waiting_for is None, "reply_conditionally{} cannot follow an 'await_goal', use reply_directly".format('({})'.format(marker) if marker else '')
146  self.default_reply = None
147 
148  print("{}.reply_conditionally{}: Waiting {}for goal..."
149  .format(self._name, '({})'.format(marker) if marker else '',
150  str(timeout)+'s ' if timeout is not None else ''))
151  assert self._request.wait(timeout), "{}.reply_conditionally{} did not get a goal in time"\
152  .format(self._name, '({})'.format(marker) if marker else '')
153 
154  self._request.clear()
155  print("{}.reply_conditionally{}: Got goal: {}"
156  .format(self._name, '({})'.format(marker) if marker else '', self.goal_formatter(self._current_goal)))
157 
158  print("{}: Think for {}s...".format(self._name, reply_delay))
159  match = condition(self._current_goal)
160  if match:
161  self._next_reply = true_result
162  else:
163  self._next_reply = false_result
164  countdown_sleep(reply_delay, text="{}.reply_conditionally{}: Think for {}s. "
165  .format(self._name, '({})'.format(marker) if marker else '', reply_delay) + "Remaining {}s...")
166  # raw_input("Press the any-key to continue: ")
167 
168  self._reply.set()
169  self._sent.wait()
170  self._sent.clear()
171 
172  print("{}.reply_conditionally{}: Finished replying"
173  .format(self._name, '({})'.format(marker) if marker else ''))
174 
175  return match
176 
177  def await_goal(self, timeout=None, marker=None):
178  """
179  Await a goal to be sent to this Scriptable... and return that goal for close inspection.
180  Based on that, send a reply via `direct_reply`
181  An AssertionError is raised when a goal is not received within the given timeout.
182 
183  :param timeout: how long to wait for the goal? Defaults to None to wait indefinitely
184  :param marker: A str that is printed in the output for easy reference between different replies
185  :return: the received goal
186  """
187  print('\n######## {}.await_goal{} ###########'
188  .format(self._name, '({})'.format(marker) if marker else ''))
189  self.default_reply = None
190 
191  print("{}.await_goal{}: Waiting {}for goal..."
192  .format(self._name, '({})'.format(marker) if marker else '',
193  str(timeout)+'s ' if timeout is not None else ''))
194  assert self._request.wait(timeout), "{}.await_goal{} did not get a goal in time".format(self._name, '({})'.format(marker) if marker else '')
195  self._request.clear()
196  print("{}.await_goal{}: Got goal: {}"
197  .format(self._name, '({})'.format(marker) if marker else '', self.goal_formatter(self._current_goal)))
198 
199  self._waiting_for = 'direct_reply'
200 
201  return self._current_goal
202 
203  def direct_reply(self, result, reply_delay=None, marker=None):
204  """
205  Reply to the current goal with the given result, after `reply_delay` amount of seconds.
206 
207  :param result: a ...Result of the type associated with the type of this server
208  :param reply_delay: how long to 'reply_delay/calculate' on this goal before sending the reply
209  :param marker: A str that is printed in the output for easy reference between different replies
210  """
211  assert self._waiting_for == 'direct_reply', "reply cannot follow an 'await_goal', use reply_directly"
212 
213  if reply_delay is None:
214  reply_delay = self.default_reply_delay
215 
216  self._next_reply = result
217 
218  # The second {} will be filled by countdown_sleep
219  countdown_sleep(reply_delay, text="{}.direct_reply{}: Think for {}s. "
220  .format(self._name, '({})'.format(marker) if marker else '', reply_delay) + "Remaining {}s...")
221 
222  self._reply.set()
223  self._sent.wait()
224  self._sent.clear()
225 
226  self._waiting_for = None
227 
228  print("{}.direct_reply{}: Finished replying".format(self._name, '({})'.format(marker) if marker else ''))
229 
230  @property
231  def default_reply(self):
232  """
233  The Result that is currently set to be returned by default
234  """
235  return self._default_reply
236 
237  @default_reply.setter
238  def default_reply(self, result):
239  """
240  Set the current default reply.
241  If this is None, there will not be a default reply, then a reply must be defined via a reply*-call
242 
243  :param result: a ...Result of the type associated with the type of this server
244  """
245  self._default_reply = result
246 
247  @property
249  """
250  Wait this amount of time before returning the default reply
251  """
252  return self._default_reply_delay
253 
254  @default_reply_delay.setter
255  def default_reply_delay(self, delay):
256  """Set the delay after which the `default_result` is sent
257 
258  :param delay number of seconds to wait before sending the `default_result`"""
259  self._default_reply_delay = delay
260 
261  @contextlib.contextmanager
262  def custom_reply(self):
263  """
264  Use this context manager to temporarily define your own replies and go back to defaulting outside the context
265 
266  Usage:
267 
268  >>> server = ScriptableBase()
269  >>> server.default_result = True
270  >>> # Do other stuff that does not require special attention of server
271  >>> with server.custom_reply():
272  >>> server.reply(False)
273  >>> # Continue to do other stuff that does not require special attention of server
274 
275  """
276  default = self.default_reply
277  self.default_reply = None
278  yield
279  self.default_reply = default
280 
281  def clear_goals(self):
282  """
283  Clear the log of goals that we've received
284  """
285  self._received_goals = []
286 
287  @property
288  def current_goal(self):
289  """
290  The goal we've received last
291  """
292  return self._received_goals[-1]
293 
294  @property
295  def received_goals(self):
296  """
297  all goals received (since the last `clear_goals()` call
298  """
299  return self._received_goals
300 
301  def add_pre_reply_callback(self, callback):
302  """
303  Add a callback that is called just before a result is being sent back
304 
305  :param callback: callable that will receive the goal and result. The return value of the callable is discarded
306  """
307  assert isinstance(callback, collections.Callable)
308  self._pre_reply_callbacks += [callback]
309 
310  def _call_pre_reply_callbacks(self, goal, reply):
311  """
312  Trigger all callback in the order they were added, with the current goal and the reply to it
313 
314  :param goal: current goal
315  :param reply: reply to that goal
316  """
317  for func in self._pre_reply_callbacks:
318  func(goal, reply)
319 
320  def match_in_received_goals(self, match_against, key=lambda x: x):
321  """
322  Find out if this server has any goal in it's history that is also in `match_against`
323 
324  :param match_against: We're looking for any of the items in `match_against`
325  :param key: optionally transform the received goals with this callable into the same type as `match_against`'s elements
326  :return: The matching elements
327  """
328  assert isinstance(key, collections.Callable), "key must be a callable"
329  processed_previous_goals = list(map(key, self.received_goals))
330  # Is there any overlap between the (processed) previous goals and what we're looking for?
331  return set(match_against).intersection(set(processed_previous_goals))
332 
333  @contextlib.contextmanager
334  def remember_goals(self):
335  """
336  Remember goals only instance of this context.
337  Goals before this context are forgotten;
338  Goals received during/in the context only are remembered;
339  after the context everything is forgotten.
340  """
341  self.clear_goals()
342  yield
343  self.clear_goals()
def match_in_received_goals(self, match_against, key=lambda x:x)
def direct_reply(self, result, reply_delay=None, marker=None)
def __init__(self, name, goal_formatter=format, reply_formatter=format, default_reply=None, default_reply_delay=0)
def await_goal(self, timeout=None, marker=None)
def countdown_sleep(duration, stepsize=1, text="{}")
Definition: util.py:22
def reply_conditionally(self, condition, true_result, false_result, timeout=None, reply_delay=None, marker=None)
def reply(self, result, timeout=None, reply_delay=None, marker=None)


scenario_test_tools
Author(s): Loy van Beek
autogenerated on Wed Apr 7 2021 03:03:18