test_call_service.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rostest
5 import unittest
6 import time
7 
8 from roscpp.srv import GetLoggers
9 
10 from json import loads, dumps
11 from std_msgs.msg import String
12 from std_srvs.srv import SetBool
13 
14 from rosbridge_library.capabilities.call_service import CallService
15 from rosbridge_library.protocol import Protocol
16 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
17 
18 
19 class TestCallService(unittest.TestCase):
20 
21  def setUp(self):
22  rospy.init_node("test_call_service")
23 
25  proto = Protocol("test_missing_arguments")
26  s = CallService(proto)
27  msg = loads(dumps({"op": "call_service"}))
28  self.assertRaises(MissingArgumentException, s.call_service, msg)
29 
31  proto = Protocol("test_invalid_arguments")
32  s = CallService(proto)
33 
34  msg = loads(dumps({"op": "call_service", "service": 3}))
35  self.assertRaises(InvalidArgumentException, s.call_service, msg)
36 
38  # Prepare to call the service the 'proper' way
39  p = rospy.ServiceProxy(rospy.get_name() + "/get_loggers", GetLoggers)
40  p.wait_for_service()
41  time.sleep(1.0)
42 
43  proto = Protocol("test_call_service_works")
44  s = CallService(proto)
45  msg = loads(dumps({"op": "call_service", "service": rospy.get_name() + "/get_loggers"}))
46 
47  received = {"msg": None, "arrived": False}
48 
49  def cb(msg, cid=None):
50  received["msg"] = msg
51  received["arrived"] = True
52 
53  proto.send = cb
54 
55  s.call_service(msg)
56 
57  timeout = 5.0
58  start = rospy.Time.now()
59  while rospy.Time.now() - start < rospy.Duration(timeout):
60  if received["arrived"]:
61  break
62  time.sleep(0.1)
63 
64  # The rosbridge service call actually causes another logger to appear,
65  # so do the "regular" service call after that.
66  ret = p()
67 
68  self.assertTrue(received["msg"]["result"])
69  for x, y in zip(ret.loggers, received["msg"]["values"]["loggers"]):
70  self.assertEqual(x.name, y["name"])
71  self.assertEqual(x.level, y["level"])
72 
74  # Dummy service that instantly fails
75  service_server = rospy.Service("set_bool_fail", SetBool,
76  lambda req: None)
77 
78  proto = Protocol("test_call_service_fail")
79  s = CallService(proto)
80  send_msg = loads(dumps({"op": "call_service", "service": rospy.get_name() + "/set_bool_fail", "args": '[ true ]'}))
81 
82  received = {"msg": None, "arrived": False}
83  def cb(msg, cid=None):
84  received["msg"] = msg
85  received["arrived"] = True
86 
87  proto.send = cb
88 
89  s.call_service(send_msg)
90 
91  timeout = 5.0
92  start = rospy.Time.now()
93  while rospy.Time.now() - start < rospy.Duration(timeout):
94  if received["arrived"]:
95  break
96  time.sleep(0.1)
97 
98  self.assertFalse(received["msg"]["result"])
99 
100 
101 PKG = 'rosbridge_library'
102 NAME = 'test_call_service'
103 if __name__ == '__main__':
104  rostest.unitrun(PKG, NAME, TestCallService)
105 
def dumps(ob, sort_keys=False)
Definition: cbor.py:223


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14