test_service_capabilities.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 import rosunit
4 import unittest
5 
6 from json import loads, dumps
7 
8 from rosbridge_library.capabilities.advertise_service import AdvertiseService
9 from rosbridge_library.capabilities.unadvertise_service import UnadvertiseService
10 from rosbridge_library.capabilities.call_service import CallService
11 from rosbridge_library.capabilities.service_response import ServiceResponse
12 from rosbridge_library.protocol import Protocol
13 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
14 
15 
16 PKG = 'rosbridge_library'
17 NAME = 'test_service_capabilities'
18 
19 
20 class TestServiceCapabilities(unittest.TestCase):
21 
22  def setUp(self):
23  rospy.init_node(NAME)
24 
25  self.proto = Protocol(self._testMethodName)
26  # change the log function so we can verify errors are logged
27  self.proto.log = self.mock_log
28  # change the send callback so we can access the rosbridge messages
29  # being sent
30  self.proto.send = self.local_send_cb
34  self.received_message = None
35  self.log_entries = []
36 
37  def local_send_cb(self, msg):
38  self.received_message = msg
39 
40  def mock_log(self, loglevel, message, _=None):
41  self.log_entries.append((loglevel, message))
42 
44  advertise_msg = loads(dumps({"op": "advertise_service"}))
45  self.assertRaises(MissingArgumentException,
46  self.advertise.advertise_service, advertise_msg)
47 
49  advertise_msg = loads(dumps({"op": "advertise_service",
50  "type": 42,
51  "service": None}))
52  self.assertRaises(InvalidArgumentException,
53  self.advertise.advertise_service, advertise_msg)
54 
56  response_msg = loads(dumps({"op": "service_response"}))
57  self.assertRaises(MissingArgumentException,
58  self.response.service_response, response_msg)
59 
60  # this message has the optional fields, with correct types, but not the
61  # required ones
62  response_msg = loads(dumps({"op": "service_response",
63  "id": "dummy_service",
64  "values": "none"}))
65  self.assertRaises(MissingArgumentException,
66  self.response.service_response, response_msg)
67 
69  response_msg = loads(dumps({"op": "service_response",
70  "service": 5,
71  "result": "error"}))
72  self.assertRaises(InvalidArgumentException,
73  self.response.service_response, response_msg)
74 
76  service_path = "/set_bool_1"
77  advertise_msg = loads(dumps({"op": "advertise_service",
78  "type": "std_srvs/SetBool",
79  "service": service_path}))
80  self.advertise.advertise_service(advertise_msg)
81 
82  # This throws an exception if the timeout is exceeded (i.e. the service
83  # is not properly advertised)
84  rospy.wait_for_service(service_path, 1.0)
85 
87  service_path = "/set_bool_2"
88  advertise_msg = loads(dumps({"op": "advertise_service",
89  "type": "std_srvs/SetBool",
90  "service": service_path}))
91  self.advertise.advertise_service(advertise_msg)
92 
93  # Call the service via rosbridge because rospy.ServiceProxy.call() is
94  # blocking
95  call_service = CallService(self.proto)
96  call_service.call_service(loads(dumps({"op": "call_service",
97  "id": "foo",
98  "service": service_path,
99  "args": [True]})))
100 
101  loop_iterations = 0
102  while self.received_message is None:
103  rospy.sleep(rospy.Duration(0.5))
104  loop_iterations += 1
105  if loop_iterations > 3:
106  self.fail("did not receive service call rosbridge message "
107  "after waiting 2 seconds")
108 
109  self.assertFalse(self.received_message is None)
110  self.assertTrue("op" in self.received_message)
111  self.assertTrue(self.received_message["op"] == "call_service")
112  self.assertTrue("id" in self.received_message)
113 
114  # Now send the response
115  response_msg = loads(dumps({"op": "service_response",
116  "service": service_path,
117  "id": self.received_message["id"],
118  "values": {"success": True,
119  "message": ""},
120  "result": True}))
121  self.received_message = None
122  self.response.service_response(response_msg)
123 
124  loop_iterations = 0
125  while self.received_message is None:
126  rospy.sleep(rospy.Duration(0.5))
127  loop_iterations += 1
128  if loop_iterations > 3:
129  self.fail("did not receive service response rosbridge message "
130  "after waiting 2 seconds")
131 
132  self.assertFalse(self.received_message is None)
133  # Rosbridge should forward the response message to the "client"
134  # (i.e. our custom send function, see setUp())
135  self.assertEqual(self.received_message["op"], "service_response")
136  self.assertTrue(self.received_message["result"])
137 
139  service_path = "/set_bool_3"
140  advertise_msg = loads(dumps({"op": "advertise_service",
141  "type": "std_srvs/SetBool",
142  "service": service_path}))
143  self.advertise.advertise_service(advertise_msg)
144 
145  # Call the service via rosbridge because rospy.ServiceProxy.call() is
146  # blocking
147  call_service = CallService(self.proto)
148  call_service.call_service(loads(dumps({"op": "call_service",
149  "id": "foo",
150  "service": service_path,
151  "args": [True]})))
152 
153  loop_iterations = 0
154  while self.received_message is None:
155  rospy.sleep(rospy.Duration(0.5))
156  loop_iterations += 1
157  if loop_iterations > 3:
158  self.fail("did not receive service call rosbridge message "
159  "after waiting 2 seconds")
160 
161  self.assertFalse(self.received_message is None)
162  self.assertTrue("op" in self.received_message)
163  self.assertTrue(self.received_message["op"] == "call_service")
164  self.assertTrue("id" in self.received_message)
165 
166  # Now send the response
167  response_msg = loads(dumps({"op": "unadvertise_service",
168  "service": service_path}))
169  self.received_message = None
170  self.unadvertise.unadvertise_service(response_msg)
171 
172  loop_iterations = 0
173  while self.received_message is None:
174  rospy.sleep(rospy.Duration(0.5))
175  loop_iterations += 1
176  if loop_iterations > 3:
177  self.fail("did not receive service response rosbridge message "
178  "after waiting 2 seconds")
179 
180  self.assertFalse(self.received_message is None)
181  # Rosbridge should abort the existing service call with an error
182  # (i.e. "result" should be False)
183  self.assertEqual(self.received_message["op"], "service_response")
184  self.assertFalse(self.received_message["result"])
185 
186 
187 if __name__ == '__main__':
188  rosunit.unitrun(PKG, NAME, TestServiceCapabilities)
def dumps(ob, sort_keys=False)
Definition: cbor.py:223


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18