8 from roscpp.srv
import GetLoggers
10 from json
import loads, dumps
12 from std_srvs.srv
import SetBool
19 PKG =
'rosbridge_library' 20 NAME =
'test_call_service' 29 proto =
Protocol(
"test_missing_arguments")
32 self.assertRaises(MissingArgumentException, s.call_service, msg)
35 proto =
Protocol(
"test_invalid_arguments")
38 msg =
loads(
dumps({
"op":
"call_service",
"service": 3}))
39 self.assertRaises(InvalidArgumentException, s.call_service, msg)
43 p = rospy.ServiceProxy(rospy.get_name() +
"/get_loggers", GetLoggers)
47 proto =
Protocol(
"test_call_service_works")
49 msg =
loads(
dumps({
"op":
"call_service",
"service": rospy.get_name() +
"/get_loggers"}))
51 received = {
"msg":
None,
"arrived":
False}
53 def cb(msg, cid=None):
55 received[
"arrived"] =
True 62 start = rospy.Time.now()
63 while rospy.Time.now() - start < rospy.Duration(timeout):
64 if received[
"arrived"]:
72 self.assertTrue(received[
"msg"][
"result"])
73 for x, y
in zip(ret.loggers, received[
"msg"][
"values"][
"loggers"]):
74 self.assertEqual(x.name, y[
"name"])
75 self.assertEqual(x.level, y[
"level"])
79 service_server = rospy.Service(
"set_bool_fail", SetBool,
82 proto =
Protocol(
"test_call_service_fail")
84 send_msg =
loads(
dumps({
"op":
"call_service",
"service": rospy.get_name() +
"/set_bool_fail",
"args":
'[ true ]'}))
86 received = {
"msg":
None,
"arrived":
False}
87 def cb(msg, cid=None):
89 received[
"arrived"] =
True 93 s.call_service(send_msg)
96 start = rospy.Time.now()
97 while rospy.Time.now() - start < rospy.Duration(timeout):
98 if received[
"arrived"]:
102 self.assertFalse(received[
"msg"][
"result"])
105 if __name__ ==
'__main__':
106 rosunit.unitrun(PKG, NAME, TestCallService)
def test_call_service_works(self)
def test_missing_arguments(self)
def dumps(ob, sort_keys=False)
def test_invalid_arguments(self)
def test_call_service_fail(self)