test_service_capabilities.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 import rostest
4 import unittest
5 
6 from json import loads, dumps
7 
8 from rosbridge_library.capabilities.advertise_service import AdvertiseService
9 from rosbridge_library.capabilities.unadvertise_service import UnadvertiseService
10 from rosbridge_library.capabilities.call_service import CallService
11 from rosbridge_library.capabilities.service_response import ServiceResponse
12 from rosbridge_library.protocol import Protocol
13 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
14 
15 
16 class TestServiceCapabilities(unittest.TestCase):
17  def setUp(self):
18  self.proto = Protocol(self._testMethodName)
19  # change the log function so we can verify errors are logged
20  self.proto.log = self.mock_log
21  # change the send callback so we can access the rosbridge messages
22  # being sent
23  self.proto.send = self.local_send_cb
27  self.received_message = None
28  self.log_entries = []
29 
30  def local_send_cb(self, msg):
31  self.received_message = msg
32 
33  def mock_log(self, loglevel, message, _=None):
34  self.log_entries.append((loglevel, message))
35 
37  advertise_msg = loads(dumps({"op": "advertise_service"}))
38  self.assertRaises(MissingArgumentException,
39  self.advertise.advertise_service, advertise_msg)
40 
42  advertise_msg = loads(dumps({"op": "advertise_service",
43  "type": 42,
44  "service": None}))
45  self.assertRaises(InvalidArgumentException,
46  self.advertise.advertise_service, advertise_msg)
47 
49  response_msg = loads(dumps({"op": "service_response"}))
50  self.assertRaises(MissingArgumentException,
51  self.response.service_response, response_msg)
52 
53  # this message has the optional fields, with correct types, but not the
54  # required ones
55  response_msg = loads(dumps({"op": "service_response",
56  "id": "dummy_service",
57  "values": "none"}))
58  self.assertRaises(MissingArgumentException,
59  self.response.service_response, response_msg)
60 
62  response_msg = loads(dumps({"op": "service_response",
63  "service": 5,
64  "result": "error"}))
65  self.assertRaises(InvalidArgumentException,
66  self.response.service_response, response_msg)
67 
69  service_path = "/set_bool_1"
70  advertise_msg = loads(dumps({"op": "advertise_service",
71  "type": "std_srvs/SetBool",
72  "service": service_path}))
73  self.advertise.advertise_service(advertise_msg)
74 
75  # This throws an exception if the timeout is exceeded (i.e. the service
76  # is not properly advertised)
77  rospy.wait_for_service(service_path, 1.0)
78 
80  service_path = "/set_bool_2"
81  advertise_msg = loads(dumps({"op": "advertise_service",
82  "type": "std_srvs/SetBool",
83  "service": service_path}))
84  self.advertise.advertise_service(advertise_msg)
85 
86  # Call the service via rosbridge because rospy.ServiceProxy.call() is
87  # blocking
88  call_service = CallService(self.proto)
89  call_service.call_service(loads(dumps({"op": "call_service",
90  "id": "foo",
91  "service": service_path,
92  "args": [True]})))
93 
94  loop_iterations = 0
95  while self.received_message is None:
96  rospy.sleep(rospy.Duration(0.5))
97  loop_iterations += 1
98  if loop_iterations > 3:
99  self.fail("did not receive service call rosbridge message "
100  "after waiting 2 seconds")
101 
102  self.assertFalse(self.received_message is None)
103  self.assertTrue("op" in self.received_message)
104  self.assertTrue(self.received_message["op"] == "call_service")
105  self.assertTrue("id" in self.received_message)
106 
107  # Now send the response
108  response_msg = loads(dumps({"op": "service_response",
109  "service": service_path,
110  "id": self.received_message["id"],
111  "values": {"success": True,
112  "message": ""},
113  "result": True}))
114  self.received_message = None
115  self.response.service_response(response_msg)
116 
117  loop_iterations = 0
118  while self.received_message is None:
119  rospy.sleep(rospy.Duration(0.5))
120  loop_iterations += 1
121  if loop_iterations > 3:
122  self.fail("did not receive service response rosbridge message "
123  "after waiting 2 seconds")
124 
125  self.assertFalse(self.received_message is None)
126  # Rosbridge should forward the response message to the "client"
127  # (i.e. our custom send function, see setUp())
128  self.assertEqual(self.received_message["op"], "service_response")
129  self.assertTrue(self.received_message["result"])
130 
132  service_path = "/set_bool_3"
133  advertise_msg = loads(dumps({"op": "advertise_service",
134  "type": "std_srvs/SetBool",
135  "service": service_path}))
136  self.advertise.advertise_service(advertise_msg)
137 
138  # Call the service via rosbridge because rospy.ServiceProxy.call() is
139  # blocking
140  call_service = CallService(self.proto)
141  call_service.call_service(loads(dumps({"op": "call_service",
142  "id": "foo",
143  "service": service_path,
144  "args": [True]})))
145 
146  loop_iterations = 0
147  while self.received_message is None:
148  rospy.sleep(rospy.Duration(0.5))
149  loop_iterations += 1
150  if loop_iterations > 3:
151  self.fail("did not receive service call rosbridge message "
152  "after waiting 2 seconds")
153 
154  self.assertFalse(self.received_message is None)
155  self.assertTrue("op" in self.received_message)
156  self.assertTrue(self.received_message["op"] == "call_service")
157  self.assertTrue("id" in self.received_message)
158 
159  # Now send the response
160  response_msg = loads(dumps({"op": "unadvertise_service",
161  "service": service_path}))
162  self.received_message = None
163  self.unadvertise.unadvertise_service(response_msg)
164 
165  loop_iterations = 0
166  while self.received_message is None:
167  rospy.sleep(rospy.Duration(0.5))
168  loop_iterations += 1
169  if loop_iterations > 3:
170  self.fail("did not receive service response rosbridge message "
171  "after waiting 2 seconds")
172 
173  self.assertFalse(self.received_message is None)
174  # Rosbridge should abort the existing service call with an error
175  # (i.e. "result" should be False)
176  self.assertEqual(self.received_message["op"], "service_response")
177  self.assertFalse(self.received_message["result"])
178 
179 
180 PKG = 'rosbridge_library'
181 NAME = 'test_service_capabilities'
182 if __name__ == '__main__':
183  rospy.init_node(NAME)
184  rostest.rosrun(PKG, NAME, TestServiceCapabilities)
def dumps(ob, sort_keys=False)
Definition: cbor.py:223


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14