6 from json
import loads, dumps
16 PKG =
'rosbridge_library' 17 NAME =
'test_service_capabilities' 44 advertise_msg =
loads(
dumps({
"op":
"advertise_service"}))
45 self.assertRaises(MissingArgumentException,
46 self.
advertise.advertise_service, advertise_msg)
49 advertise_msg =
loads(
dumps({
"op":
"advertise_service",
52 self.assertRaises(InvalidArgumentException,
53 self.
advertise.advertise_service, advertise_msg)
56 response_msg =
loads(
dumps({
"op":
"service_response"}))
57 self.assertRaises(MissingArgumentException,
58 self.
response.service_response, response_msg)
62 response_msg =
loads(
dumps({
"op":
"service_response",
63 "id":
"dummy_service",
65 self.assertRaises(MissingArgumentException,
66 self.
response.service_response, response_msg)
69 response_msg =
loads(
dumps({
"op":
"service_response",
72 self.assertRaises(InvalidArgumentException,
73 self.
response.service_response, response_msg)
76 service_path =
"/set_bool_1" 77 advertise_msg =
loads(
dumps({
"op":
"advertise_service",
78 "type":
"std_srvs/SetBool",
79 "service": service_path}))
84 rospy.wait_for_service(service_path, 1.0)
87 service_path =
"/set_bool_2" 88 advertise_msg =
loads(
dumps({
"op":
"advertise_service",
89 "type":
"std_srvs/SetBool",
90 "service": service_path}))
96 call_service.call_service(
loads(
dumps({
"op":
"call_service",
98 "service": service_path,
103 rospy.sleep(rospy.Duration(0.5))
105 if loop_iterations > 3:
106 self.fail(
"did not receive service call rosbridge message " 107 "after waiting 2 seconds")
115 response_msg =
loads(
dumps({
"op":
"service_response",
116 "service": service_path,
118 "values": {
"success":
True,
122 self.
response.service_response(response_msg)
126 rospy.sleep(rospy.Duration(0.5))
128 if loop_iterations > 3:
129 self.fail(
"did not receive service response rosbridge message " 130 "after waiting 2 seconds")
139 service_path =
"/set_bool_3" 140 advertise_msg =
loads(
dumps({
"op":
"advertise_service",
141 "type":
"std_srvs/SetBool",
142 "service": service_path}))
148 call_service.call_service(
loads(
dumps({
"op":
"call_service",
150 "service": service_path,
155 rospy.sleep(rospy.Duration(0.5))
157 if loop_iterations > 3:
158 self.fail(
"did not receive service call rosbridge message " 159 "after waiting 2 seconds")
167 response_msg =
loads(
dumps({
"op":
"unadvertise_service",
168 "service": service_path}))
174 rospy.sleep(rospy.Duration(0.5))
176 if loop_iterations > 3:
177 self.fail(
"did not receive service response rosbridge message " 178 "after waiting 2 seconds")
187 if __name__ ==
'__main__':
188 rosunit.unitrun(PKG, NAME, TestServiceCapabilities)
def dumps(ob, sort_keys=False)
def test_response_missing_arguments(self)
def test_call_advertised_service(self)
def unadvertise_service()
def test_advertise_service(self)
def test_advertise_missing_arguments(self)
def local_send_cb(self, msg)
def test_response_invalid_arguments(self)
def mock_log(self, loglevel, message, _=None)
def test_unadvertise_with_live_request(self)
def test_advertise_invalid_arguments(self)