3 Perform a unitest on a node acting as a service server. 5 Verify that, given a service request, the answer provided is the one expected. 7 Test files should set the following parameters: 9 <test test-name="test_service" pkg="rostest_node_interface_validation" type="test_service" > 13 input: input parameter of the message, in dictionary format 14 output: output parameter of the message, in dictionary format 18 Several calls can be added. 19 If the input or the output should be empty, place None. 23 __author__ =
"Anthony Remazeilles" 24 __email__ =
"anthony.remazeilles@tecnalia.com" 25 __copyright__ =
"Copyright (C) 2020 Tecnalia Research and Innovation" 26 __licence__ =
"Apache 2.0" 34 from rospy_message_converter
import message_converter
36 CLASSNAME =
'servicetest' 41 super(ServiceTest, self).
__init__(*args)
42 rospy.init_node(CLASSNAME)
47 calls = rospy.get_param(
'~calls')
51 keys = [
'name',
'input',
'output']
54 self.fail(
"{} field required, but not specified in {}".format(item, call))
58 srv_data[
'name'] = call[
'name']
59 srv_data[
'input'] = call[
'input']
60 srv_data[
'output'] = call[
'output']
62 if srv_data[
'input'] ==
'None':
63 rospy.logwarn(
'None input converted to empty input')
64 srv_data[
'input'] = dict()
65 if srv_data[
'output'] ==
'None':
66 rospy.logwarn(
'None output converted to empty output')
67 srv_data[
'output'] = dict()
69 self.
calls.append(srv_data)
70 except KeyError
as err:
71 msg_err =
"service_test not initialized properly" 72 msg_err +=
" Parameter [%s] not set." % (str(err))
73 msg_err +=
" Caller ID: [%s] Resolved name: [%s]" % (
74 rospy.get_caller_id(),
75 rospy.resolve_name(err.args[0]))
80 use_sim_time = rospy.get_param(
'/use_sim_time',
False)
82 while not rospy.is_shutdown()
and \
83 use_sim_time
and (rospy.Time.now() == rospy.Time(0)):
84 rospy.logwarn_throttle(
85 1,
'/use_sim_time is specified and rostime is 0, /clock is published?')
86 if time.time() - self.
t_start > 10:
87 self.fail(
'Timed out (10s) of /clock publication.')
92 """Test services are advertised""" 94 s_name_set = set([ item[
'name']
for item
in self.
calls])
97 while not rospy.is_shutdown():
99 t_elapsed = t_now - t_start
102 if t_elapsed > t_timeout_max:
105 for s_name
in rosservice.get_service_list():
106 if s_name
in s_name_set:
107 s_name_set.remove(s_name)
111 assert not s_name_set, \
112 'services [%s] not advertized on time' % (s_name_set)
114 rospy.loginfo(
"All services advertized on time")
118 for item
in self.
calls:
119 rospy.loginfo(
"Testing service {} with input parameters {}".format(
122 self.
_test_service(item[
'name'], item[
'input'], item[
'output'])
123 rospy.loginfo(
"So far so good")
126 self.assert_(srv_name)
127 all_services = rosservice.get_service_list()
128 self.assertIn(srv_name, all_services)
129 srv_class = rosservice.get_service_class_by_name(srv_name)
132 srv_proxy = rospy.ServiceProxy(srv_name, srv_class)
133 except KeyError
as err:
134 msg_err =
"Service proxy could not be created" 139 srv_resp = srv_proxy(**srv_input)
141 srv_resp = srv_proxy()
143 except (genpy.SerializationError, rospy.ROSException), err:
144 msg_err =
"Service proxy error: {}".format(err.message)
146 srv_dic = message_converter.convert_ros_message_to_dictionary(srv_resp)
148 self.assertDictEqual(srv_dic, srv_output)
153 rostest.run(
'rostest', CLASSNAME, ServiceTest, sys.argv)
154 except KeyboardInterrupt:
156 print(
"{} exiting".format(CLASSNAME))
def _test_service(self, srv_name, srv_input, srv_output)
def test_advertise_service(self)
def test_service_call(self)