scriptable_service_server.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2020 Mojin Robotics GmbH
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import rospy
18 
19 from scenario_test_tools.scriptable_base import ScriptableBase
20 from scenario_test_tools.util import countdown_sleep
21 
22 
24  """
25  ScriptableServiceServer allows its users to determine the Response to a service Request
26  """
27  def __init__(self, name, service_type, request_formatter=format, response_formatter=format, default_response=None, default_response_delay=0):
28  """
29  Set up a ScriptableServiceServer based on the name and the type ofService is should implement
30  :param service_type: Service type (e.g. std_srvs/SetBool)
31  """
32  ScriptableBase.__init__(self, name,
33  goal_formatter=request_formatter,
34  reply_formatter=response_formatter,
35  default_reply=default_response,
36  default_reply_delay=default_response_delay)
37 
38  self._srv = rospy.Service(name, service_type, self._execute_cb)
39 
40  def __repr__(self):
41  return "ScriptableServiceServer('{}')".format(self._name)
42 
43  def stop(self):
44  super(ScriptableServiceServer, self).stop()
45 
46  def _execute_cb(self, request):
47  """
48  Called when the underlying service receives a goal.
49  If the default_result is None, it will wait for a custom result to be set via reply* otherwise
50  return the default_result after the given default_result_delay
51 
52  In the reply-case,it then notifies self.reply* (which should be called by a test script outside this class),
53  after which self.reply* determines the result to the goal.
54  Then it notifies _execute_cb that the result has been determined so that _execute_cb can send it
55  """
56 
57  self._current_goal = request
58  try:
59  request_str = self.goal_formatter(self._current_goal)
60  except Exception as e:
61  rospy.logerr("request_formatter of {} raised an exception: {}".format(self._name, e))
62  request_str = self._current_goal
63  print('{}.execute: Request: {}'.format(self._name, request_str))
64 
65  if self.default_reply is not None:
66  result = self.default_reply
67  countdown_sleep(self._default_reply_delay, text="{}.execute: Wait for {}s. ".format(self._name, self._default_reply_delay) + "Remaining {}s...")
68  else:
69  self._request.set()
70  # Now, wait for to be called, which sets the _reply event AND the _next_reply
71  self._reply.wait()
72  self._reply.clear()
73 
74  result = self._next_reply
75 
76  self._next_reply = None
77  self._current_goal = None
78  self._sent.set()
79 
80  self._call_pre_reply_callbacks(request, result)
81  return result
def countdown_sleep(duration, stepsize=1, text="{}")
Definition: util.py:22
def __init__(self, name, service_type, request_formatter=format, response_formatter=format, default_response=None, default_response_delay=0)


scenario_test_tools
Author(s): Loy van Beek
autogenerated on Wed Apr 7 2021 03:03:18