6 from json
import loads, dumps
34 self.log_entries.append((loglevel, message))
37 advertise_msg =
loads(
dumps({
"op":
"advertise_service"}))
38 self.assertRaises(MissingArgumentException,
39 self.advertise.advertise_service, advertise_msg)
42 advertise_msg =
loads(
dumps({
"op":
"advertise_service",
45 self.assertRaises(InvalidArgumentException,
46 self.advertise.advertise_service, advertise_msg)
49 response_msg =
loads(
dumps({
"op":
"service_response"}))
50 self.assertRaises(MissingArgumentException,
51 self.response.service_response, response_msg)
55 response_msg =
loads(
dumps({
"op":
"service_response",
56 "id":
"dummy_service",
58 self.assertRaises(MissingArgumentException,
59 self.response.service_response, response_msg)
62 response_msg =
loads(
dumps({
"op":
"service_response",
65 self.assertRaises(InvalidArgumentException,
66 self.response.service_response, response_msg)
69 service_path =
"/set_bool_1" 70 advertise_msg =
loads(
dumps({
"op":
"advertise_service",
71 "type":
"std_srvs/SetBool",
72 "service": service_path}))
73 self.advertise.advertise_service(advertise_msg)
77 rospy.wait_for_service(service_path, 1.0)
80 service_path =
"/set_bool_2" 81 advertise_msg =
loads(
dumps({
"op":
"advertise_service",
82 "type":
"std_srvs/SetBool",
83 "service": service_path}))
84 self.advertise.advertise_service(advertise_msg)
89 call_service.call_service(
loads(
dumps({
"op":
"call_service",
91 "service": service_path,
96 rospy.sleep(rospy.Duration(0.5))
98 if loop_iterations > 3:
99 self.fail(
"did not receive service call rosbridge message " 100 "after waiting 2 seconds")
108 response_msg =
loads(
dumps({
"op":
"service_response",
109 "service": service_path,
111 "values": {
"success":
True,
115 self.response.service_response(response_msg)
119 rospy.sleep(rospy.Duration(0.5))
121 if loop_iterations > 3:
122 self.fail(
"did not receive service response rosbridge message " 123 "after waiting 2 seconds")
132 service_path =
"/set_bool_3" 133 advertise_msg =
loads(
dumps({
"op":
"advertise_service",
134 "type":
"std_srvs/SetBool",
135 "service": service_path}))
136 self.advertise.advertise_service(advertise_msg)
141 call_service.call_service(
loads(
dumps({
"op":
"call_service",
143 "service": service_path,
148 rospy.sleep(rospy.Duration(0.5))
150 if loop_iterations > 3:
151 self.fail(
"did not receive service call rosbridge message " 152 "after waiting 2 seconds")
160 response_msg =
loads(
dumps({
"op":
"unadvertise_service",
161 "service": service_path}))
163 self.unadvertise.unadvertise_service(response_msg)
167 rospy.sleep(rospy.Duration(0.5))
169 if loop_iterations > 3:
170 self.fail(
"did not receive service response rosbridge message " 171 "after waiting 2 seconds")
180 PKG =
'rosbridge_library' 181 NAME =
'test_service_capabilities' 182 if __name__ ==
'__main__':
183 rospy.init_node(NAME)
184 rostest.rosrun(PKG, NAME, TestServiceCapabilities)
def dumps(ob, sort_keys=False)
def test_response_missing_arguments(self)
def test_call_advertised_service(self)
def test_advertise_service(self)
def test_advertise_missing_arguments(self)
def local_send_cb(self, msg)
def test_response_invalid_arguments(self)
def mock_log(self, loglevel, message, _=None)
def test_unadvertise_with_live_request(self)
def test_advertise_invalid_arguments(self)