test_service.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 """
3 Perform a unitest on a node acting as a service server.
4 
5 Verify that, given a service request, the answer provided is the one expected.
6 
7 Test files should set the following parameters:
8 
9 <test test-name="test_service" pkg="rostest_node_interface_validation" type="test_service" >
10  <rosparam>
11  calls:
12  - name: service name
13  input: input parameter of the message, in dictionary format
14  output: output parameter of the message, in dictionary format
15  </rosparam>
16 </test>
17 
18 Several calls can be added.
19 If the input or the output should be empty, place None.
20 
21 """
22 
23 __author__ = "Anthony Remazeilles"
24 __email__ = "anthony.remazeilles@tecnalia.com"
25 __copyright__ = "Copyright (C) 2020 Tecnalia Research and Innovation"
26 __licence__ = "Apache 2.0"
27 
28 import sys
29 import time
30 import unittest
31 import rospy
32 import rosservice
33 import rostest
34 from rospy_message_converter import message_converter
35 
36 CLASSNAME = 'servicetest'
37 
38 
39 class ServiceTest(unittest.TestCase):
40  def __init__(self, *args):
41  super(ServiceTest, self).__init__(*args)
42  rospy.init_node(CLASSNAME)
43 
44  self.calls = list()
45 
46  try:
47  calls = rospy.get_param('~calls')
48 
49  for call in calls:
50 
51  keys = ['name', 'input', 'output']
52  for item in keys:
53  if item not in call:
54  self.fail("{} field required, but not specified in {}".format(item, call))
55 
56  srv_data = dict()
57 
58  srv_data['name'] = call['name']
59  srv_data['input'] = call['input']
60  srv_data['output'] = call['output']
61 
62  if srv_data['input'] == 'None':
63  rospy.logwarn('None input converted to empty input')
64  srv_data['input'] = dict()
65  if srv_data['output'] == 'None':
66  rospy.logwarn('None output converted to empty output')
67  srv_data['output'] = dict()
68 
69  self.calls.append(srv_data)
70  except KeyError as err:
71  msg_err = "service_test not initialized properly"
72  msg_err += " Parameter [%s] not set." % (str(err))
73  msg_err += " Caller ID: [%s] Resolved name: [%s]" % (
74  rospy.get_caller_id(),
75  rospy.resolve_name(err.args[0]))
76  self.fail(msg_err)
77 
78  def setUp(self):
79  # warn on /use_sim_time is true
80  use_sim_time = rospy.get_param('/use_sim_time', False)
81  self.t_start = time.time()
82  while not rospy.is_shutdown() and \
83  use_sim_time and (rospy.Time.now() == rospy.Time(0)):
84  rospy.logwarn_throttle(
85  1, '/use_sim_time is specified and rostime is 0, /clock is published?')
86  if time.time() - self.t_start > 10:
87  self.fail('Timed out (10s) of /clock publication.')
88  # must use time.sleep because /clock isn't yet published, so rospy.sleep hangs.
89  time.sleep(0.1)
90 
92  """Test services are advertised"""
93  t_start = self.t_start
94  s_name_set = set([ item['name'] for item in self.calls])
95  t_timeout_max = 10.0
96 
97  while not rospy.is_shutdown():
98  t_now = time.time()
99  t_elapsed = t_now - t_start
100  if not s_name_set:
101  break
102  if t_elapsed > t_timeout_max:
103  break
104 
105  for s_name in rosservice.get_service_list():
106  if s_name in s_name_set:
107  s_name_set.remove(s_name)
108  time.sleep(0.05)
109 
110  # All services should have been detected
111  assert not s_name_set, \
112  'services [%s] not advertized on time' % (s_name_set)
113 
114  rospy.loginfo("All services advertized on time")
115 
116  def test_service_call(self):
117 
118  for item in self.calls:
119  rospy.loginfo("Testing service {} with input parameters {}".format(
120  item['name'],
121  item['input']))
122  self._test_service(item['name'], item['input'], item['output'])
123  rospy.loginfo("So far so good")
124 
125  def _test_service(self, srv_name, srv_input, srv_output):
126  self.assert_(srv_name)
127  all_services = rosservice.get_service_list()
128  self.assertIn(srv_name, all_services)
129  srv_class = rosservice.get_service_class_by_name(srv_name)
130 
131  try:
132  srv_proxy = rospy.ServiceProxy(srv_name, srv_class)
133  except KeyError as err:
134  msg_err = "Service proxy could not be created"
135  self.fail(msg_err)
136 
137  try:
138  if srv_input:
139  srv_resp = srv_proxy(**srv_input)
140  else:
141  srv_resp = srv_proxy()
142 
143  except (genpy.SerializationError, rospy.ROSException), err:
144  msg_err = "Service proxy error: {}".format(err.message)
145  self.fail(msg_err)
146  srv_dic = message_converter.convert_ros_message_to_dictionary(srv_resp)
147 
148  self.assertDictEqual(srv_dic, srv_output)
149 
150 
151 def main():
152  try:
153  rostest.run('rostest', CLASSNAME, ServiceTest, sys.argv)
154  except KeyboardInterrupt:
155  pass
156  print("{} exiting".format(CLASSNAME))
def _test_service(self, srv_name, srv_input, srv_output)


rostest_node_interface_validation
Author(s): Anthony Remazeilles
autogenerated on Mon Feb 28 2022 23:33:37