8 from roscpp.srv
import GetLoggers
10 from json
import loads, dumps
12 from std_srvs.srv
import SetBool
22 rospy.init_node(
"test_call_service")
25 proto =
Protocol(
"test_missing_arguments")
28 self.assertRaises(MissingArgumentException, s.call_service, msg)
31 proto =
Protocol(
"test_invalid_arguments")
34 msg =
loads(
dumps({
"op":
"call_service",
"service": 3}))
35 self.assertRaises(InvalidArgumentException, s.call_service, msg)
39 p = rospy.ServiceProxy(rospy.get_name() +
"/get_loggers", GetLoggers)
43 proto =
Protocol(
"test_call_service_works")
45 msg =
loads(
dumps({
"op":
"call_service",
"service": rospy.get_name() +
"/get_loggers"}))
47 received = {
"msg":
None,
"arrived":
False}
49 def cb(msg, cid=None):
51 received[
"arrived"] =
True 58 start = rospy.Time.now()
59 while rospy.Time.now() - start < rospy.Duration(timeout):
60 if received[
"arrived"]:
68 self.assertTrue(received[
"msg"][
"result"])
69 for x, y
in zip(ret.loggers, received[
"msg"][
"values"][
"loggers"]):
70 self.assertEqual(x.name, y[
"name"])
71 self.assertEqual(x.level, y[
"level"])
75 service_server = rospy.Service(
"set_bool_fail", SetBool,
78 proto =
Protocol(
"test_call_service_fail")
80 send_msg =
loads(
dumps({
"op":
"call_service",
"service": rospy.get_name() +
"/set_bool_fail",
"args":
'[ true ]'}))
82 received = {
"msg":
None,
"arrived":
False}
83 def cb(msg, cid=None):
85 received[
"arrived"] =
True 89 s.call_service(send_msg)
92 start = rospy.Time.now()
93 while rospy.Time.now() - start < rospy.Duration(timeout):
94 if received[
"arrived"]:
98 self.assertFalse(received[
"msg"][
"result"])
101 PKG =
'rosbridge_library' 102 NAME =
'test_call_service' 103 if __name__ ==
'__main__':
104 rostest.unitrun(PKG, NAME, TestCallService)
def test_call_service_works(self)
def test_missing_arguments(self)
def dumps(ob, sort_keys=False)
def test_invalid_arguments(self)
def test_call_service_fail(self)