test_call_service.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rosunit
5 import unittest
6 import time
7 
8 from roscpp.srv import GetLoggers
9 
10 from json import loads, dumps
11 from std_msgs.msg import String
12 from std_srvs.srv import SetBool
13 
14 from rosbridge_library.capabilities.call_service import CallService
15 from rosbridge_library.protocol import Protocol
16 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
17 
18 
19 PKG = 'rosbridge_library'
20 NAME = 'test_call_service'
21 
22 
23 class TestCallService(unittest.TestCase):
24 
25  def setUp(self):
26  rospy.init_node(NAME)
27 
29  proto = Protocol("test_missing_arguments")
30  s = CallService(proto)
31  msg = loads(dumps({"op": "call_service"}))
32  self.assertRaises(MissingArgumentException, s.call_service, msg)
33 
35  proto = Protocol("test_invalid_arguments")
36  s = CallService(proto)
37 
38  msg = loads(dumps({"op": "call_service", "service": 3}))
39  self.assertRaises(InvalidArgumentException, s.call_service, msg)
40 
42  # Prepare to call the service the 'proper' way
43  p = rospy.ServiceProxy(rospy.get_name() + "/get_loggers", GetLoggers)
44  p.wait_for_service()
45  time.sleep(1.0)
46 
47  proto = Protocol("test_call_service_works")
48  s = CallService(proto)
49  msg = loads(dumps({"op": "call_service", "service": rospy.get_name() + "/get_loggers"}))
50 
51  received = {"msg": None, "arrived": False}
52 
53  def cb(msg, cid=None):
54  received["msg"] = msg
55  received["arrived"] = True
56 
57  proto.send = cb
58 
59  s.call_service(msg)
60 
61  timeout = 5.0
62  start = rospy.Time.now()
63  while rospy.Time.now() - start < rospy.Duration(timeout):
64  if received["arrived"]:
65  break
66  time.sleep(0.1)
67 
68  # The rosbridge service call actually causes another logger to appear,
69  # so do the "regular" service call after that.
70  ret = p()
71 
72  self.assertTrue(received["msg"]["result"])
73  for x, y in zip(ret.loggers, received["msg"]["values"]["loggers"]):
74  self.assertEqual(x.name, y["name"])
75  self.assertEqual(x.level, y["level"])
76 
78  # Dummy service that instantly fails
79  service_server = rospy.Service("set_bool_fail", SetBool,
80  lambda req: None)
81 
82  proto = Protocol("test_call_service_fail")
83  s = CallService(proto)
84  send_msg = loads(dumps({"op": "call_service", "service": rospy.get_name() + "/set_bool_fail", "args": '[ true ]'}))
85 
86  received = {"msg": None, "arrived": False}
87  def cb(msg, cid=None):
88  received["msg"] = msg
89  received["arrived"] = True
90 
91  proto.send = cb
92 
93  s.call_service(send_msg)
94 
95  timeout = 5.0
96  start = rospy.Time.now()
97  while rospy.Time.now() - start < rospy.Duration(timeout):
98  if received["arrived"]:
99  break
100  time.sleep(0.1)
101 
102  self.assertFalse(received["msg"]["result"])
103 
104 
105 if __name__ == '__main__':
106  rosunit.unitrun(PKG, NAME, TestCallService)
107 
def dumps(ob, sort_keys=False)
Definition: cbor.py:223


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18