test_services.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 import sys
4 import rospy
5 import rosunit
6 import unittest
7 import time
8 import random
9 
10 from rosbridge_library.internal import services, ros_loader
11 from rosbridge_library.internal import message_conversion as c
12 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
13 
14 from roscpp.srv import GetLoggers
15 
16 if sys.version_info >= (3, 0):
17  string_types = (str,)
18 else:
19  string_types = (str, unicode) # noqa: F821
20 
21 
22 PKG = 'rosbridge_library'
23 NAME = 'test_services'
24 
25 
27  # Given a dictionary d, replaces primitives with random values
28  if isinstance(d, dict):
29  for x in d:
30  d[x] = populate_random_args(d[x])
31  return d
32  elif isinstance(d, str):
33  return str(random.random())
34  elif sys.version_info < (3,0) and isinstance(d, unicode): # noqa: F821
35  return unicode(random.random()) # noqa: F821
36  elif isinstance(d, bool):
37  return True
38  elif isinstance(d, int):
39  return random.randint(100, 200)
40  elif isinstance(d, float):
41  return 3.5
42  else:
43  return d
44 
45 
47 
48  def __init__(self, name, srv_type):
49  self.name = name
50  self.srvClass = ros_loader.get_service_class(srv_type)
51  self.service = rospy.Service(name, self.srvClass, self.callback)
52 
53  def start(self):
54  req = self.srvClass._request_class()
55  gen = c.extract_values(req)
56  gen = populate_random_args(gen)
57  self.input = gen
58  services.ServiceCaller(self.name, gen, self.success, self.error).start()
59 
60  def callback(self, req):
61  self.req = req
62  time.sleep(0.5)
63  rsp = self.srvClass._response_class()
64  gen = c.extract_values(rsp)
65  gen = populate_random_args(gen)
66  try:
67  rsp = c.populate_instance(gen, rsp)
68  except:
69  print("populating instance")
70  print(rsp)
71  print("populating with")
72  print(gen)
73  raise
74  self.output = gen
75  return rsp
76 
77  def success(self, rsp):
78  self.rsp = rsp
79 
80  def error(self, exc):
81  self.exc = exc
82 
83  def validate(self, equality_function):
84  if hasattr(self, "exc"):
85  print(self.exc)
86  print(self.exc.message)
87  raise self.exc
88  equality_function(self.input, c.extract_values(self.req))
89  equality_function(self.output, self.rsp)
90 
91 
92 class TestServices(unittest.TestCase):
93 
94  def setUp(self):
95  rospy.init_node(NAME)
96 
97  def msgs_equal(self, msg1, msg2):
98  if type(msg1) in string_types and type(msg2) in string_types:
99  pass
100  else:
101  self.assertEqual(type(msg1), type(msg2))
102  if type(msg1) in c.list_types:
103  for x, y in zip(msg1, msg2):
104  self.msgs_equal(x, y)
105  elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
106  self.assertEqual(msg1, msg2)
107  else:
108  for x in msg1:
109  self.assertTrue(x in msg2)
110  for x in msg2:
111  self.assertTrue(x in msg1)
112  for x in msg1:
113  self.msgs_equal(msg1[x], msg2[x])
114 
116  # Test empty messages
117  for srv_type in ["TestEmpty", "TestResponseOnly"]:
118  cls = ros_loader.get_service_class("rosbridge_library/" + srv_type)
119  for args in [[], {}, None]:
120  # Should throw no exceptions
121  services.args_to_service_request_instance("", cls._request_class(), args)
122 
123  # Test msgs with data message
124  for srv_type in ["TestRequestOnly", "TestRequestAndResponse"]:
125  cls = ros_loader.get_service_class("rosbridge_library/" + srv_type)
126  for args in [[3], {"data": 3}]:
127  # Should throw no exceptions
128  services.args_to_service_request_instance("", cls._request_class(), args)
129  self.assertRaises(FieldTypeMismatchException, services.args_to_service_request_instance, "", cls._request_class(), ["hello"])
130 
131  # Test message with multiple fields
132  cls = ros_loader.get_service_class("rosbridge_library/TestMultipleRequestFields")
133  for args in [[3, 3.5, "hello", False], {"int": 3, "float": 3.5, "string": "hello", "bool": False}]:
134  # Should throw no exceptions
135  services.args_to_service_request_instance("", cls._request_class(), args)
136 
137  def test_service_call(self):
138  """ Test a simple getloggers service call """
139  # First, call the service the 'proper' way
140  p = rospy.ServiceProxy(rospy.get_name() + "/get_loggers", GetLoggers)
141  p.wait_for_service(0.5)
142  ret = p()
143 
144  # Now, call using the services
145  json_ret = services.call_service(rospy.get_name() + "/get_loggers")
146  for x, y in zip(ret.loggers, json_ret["loggers"]):
147  self.assertEqual(x.name, y["name"])
148  self.assertEqual(x.level, y["level"])
149 
151  """ Same as test_service_call but via the thread caller """
152  # First, call the service the 'proper' way
153  p = rospy.ServiceProxy(rospy.get_name() + "/get_loggers", GetLoggers)
154  p.wait_for_service(0.5)
155  ret = p()
156 
157  rcvd = {"json": None}
158 
159  def success(json):
160  rcvd["json"] = json
161 
162  def error():
163  raise Exception()
164 
165  # Now, call using the services
166  services.ServiceCaller(rospy.get_name() + "/get_loggers", None, success, error).start()
167 
168  time.sleep(0.5)
169 
170  for x, y in zip(ret.loggers, rcvd["json"]["loggers"]):
171  self.assertEqual(x.name, y["name"])
172  self.assertEqual(x.level, y["level"])
173 
175  t = ServiceTester("/test_service_tester", "rosbridge_library/TestRequestAndResponse")
176  t.start()
177  time.sleep(1.0)
178  t.validate(self.msgs_equal)
179 
181  ts = []
182  for srv in ["TestResponseOnly", "TestEmpty", "TestRequestAndResponse", "TestRequestOnly",
183  "TestMultipleResponseFields", "TestMultipleRequestFields", "TestArrayRequest"]:
184  t = ServiceTester("/test_service_tester_alltypes_" + srv, "rosbridge_library/" + srv)
185  t.start()
186  ts.append(t)
187 
188  time.sleep(1.0)
189 
190  for t in ts:
191  t.validate(self.msgs_equal)
192 
194  common = ["roscpp/GetLoggers", "roscpp/SetLoggerLevel",
195  "std_srvs/Empty", "nav_msgs/GetMap", "nav_msgs/GetPlan",
196  "sensor_msgs/SetCameraInfo", "topic_tools/MuxAdd",
197  "topic_tools/MuxSelect", "tf2_msgs/FrameGraph",
198  "rospy_tutorials/BadTwoInts", "rospy_tutorials/AddTwoInts"]
199  ts = []
200  for srv in common:
201  t = ServiceTester("/test_random_service_types/" + srv, srv)
202  t.start()
203  ts.append(t)
204 
205  time.sleep(1.0)
206 
207  for t in ts:
208  t.validate(self.msgs_equal)
209 
210 
211 if __name__ == '__main__':
212  rosunit.unitrun(PKG, NAME, TestServices)
213 
def validate(self, equality_function)
def __init__(self, name, srv_type)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18