test_services.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 import sys
4 import rospy
5 import rostest
6 import unittest
7 import time
8 import random
9 
10 from rosbridge_library.internal import services, ros_loader
11 from rosbridge_library.internal import message_conversion as c
12 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
13 
14 from roscpp.srv import GetLoggers
15 
16 if sys.version_info >= (3, 0):
17  string_types = (str,)
18 else:
19  string_types = (str, unicode)
20 
21 
23  # Given a dictionary d, replaces primitives with random values
24  if isinstance(d, dict):
25  for x in d:
26  d[x] = populate_random_args(d[x])
27  return d
28  elif isinstance(d, str):
29  return str(random.random())
30  elif sys.version_info < (3,0) and isinstance(d, unicode):
31  return unicode(random.random())
32  elif isinstance(d, bool):
33  return True
34  elif isinstance(d, int):
35  return random.randint(100, 200)
36  elif isinstance(d, float):
37  return 3.5
38  else:
39  return d
40 
41 
43 
44  def __init__(self, name, srv_type):
45  self.name = name
46  self.srvClass = ros_loader.get_service_class(srv_type)
47  self.service = rospy.Service(name, self.srvClass, self.callback)
48 
49  def start(self):
50  req = self.srvClass._request_class()
51  gen = c.extract_values(req)
52  gen = populate_random_args(gen)
53  self.input = gen
54  services.ServiceCaller(self.name, gen, self.success, self.error).start()
55 
56  def callback(self, req):
57  self.req = req
58  time.sleep(0.5)
59  rsp = self.srvClass._response_class()
60  gen = c.extract_values(rsp)
61  gen = populate_random_args(gen)
62  try:
63  rsp = c.populate_instance(gen, rsp)
64  except:
65  print("populating instance")
66  print(rsp)
67  print("populating with")
68  print(gen)
69  raise
70  self.output = gen
71  return rsp
72 
73  def success(self, rsp):
74  self.rsp = rsp
75 
76  def error(self, exc):
77  self.exc = exc
78 
79  def validate(self, equality_function):
80  if hasattr(self, "exc"):
81  print(self.exc)
82  print(self.exc.message)
83  raise self.exc
84  equality_function(self.input, c.extract_values(self.req))
85  equality_function(self.output, self.rsp)
86 
87 
88 class TestServices(unittest.TestCase):
89 
90  def setUp(self):
91  rospy.init_node("test_services")
92 
93  def msgs_equal(self, msg1, msg2):
94  if type(msg1) in string_types and type(msg2) in string_types:
95  pass
96  else:
97  self.assertEqual(type(msg1), type(msg2))
98  if type(msg1) in c.list_types:
99  for x, y in zip(msg1, msg2):
100  self.msgs_equal(x, y)
101  elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
102  self.assertEqual(msg1, msg2)
103  else:
104  for x in msg1:
105  self.assertTrue(x in msg2)
106  for x in msg2:
107  self.assertTrue(x in msg1)
108  for x in msg1:
109  self.msgs_equal(msg1[x], msg2[x])
110 
112  # Test empty messages
113  for srv_type in ["TestEmpty", "TestResponseOnly"]:
114  cls = ros_loader.get_service_class("rosbridge_library/" + srv_type)
115  for args in [[], {}, None]:
116  # Should throw no exceptions
117  services.args_to_service_request_instance("", cls._request_class(), args)
118 
119  # Test msgs with data message
120  for srv_type in ["TestRequestOnly", "TestRequestAndResponse"]:
121  cls = ros_loader.get_service_class("rosbridge_library/" + srv_type)
122  for args in [[3], {"data": 3}]:
123  # Should throw no exceptions
124  services.args_to_service_request_instance("", cls._request_class(), args)
125  self.assertRaises(FieldTypeMismatchException, services.args_to_service_request_instance, "", cls._request_class(), ["hello"])
126 
127  # Test message with multiple fields
128  cls = ros_loader.get_service_class("rosbridge_library/TestMultipleRequestFields")
129  for args in [[3, 3.5, "hello", False], {"int": 3, "float": 3.5, "string": "hello", "bool": False}]:
130  # Should throw no exceptions
131  services.args_to_service_request_instance("", cls._request_class(), args)
132 
133  def test_service_call(self):
134  """ Test a simple getloggers service call """
135  # First, call the service the 'proper' way
136  p = rospy.ServiceProxy(rospy.get_name() + "/get_loggers", GetLoggers)
137  p.wait_for_service(0.5)
138  ret = p()
139 
140  # Now, call using the services
141  json_ret = services.call_service(rospy.get_name() + "/get_loggers")
142  for x, y in zip(ret.loggers, json_ret["loggers"]):
143  self.assertEqual(x.name, y["name"])
144  self.assertEqual(x.level, y["level"])
145 
147  """ Same as test_service_call but via the thread caller """
148  # First, call the service the 'proper' way
149  p = rospy.ServiceProxy(rospy.get_name() + "/get_loggers", GetLoggers)
150  p.wait_for_service(0.5)
151  ret = p()
152 
153  rcvd = {"json": None}
154 
155  def success(json):
156  rcvd["json"] = json
157 
158  def error():
159  raise Exception()
160 
161  # Now, call using the services
162  services.ServiceCaller(rospy.get_name() + "/get_loggers", None, success, error).start()
163 
164  time.sleep(0.5)
165 
166  for x, y in zip(ret.loggers, rcvd["json"]["loggers"]):
167  self.assertEqual(x.name, y["name"])
168  self.assertEqual(x.level, y["level"])
169 
171  t = ServiceTester("/test_service_tester", "rosbridge_library/TestRequestAndResponse")
172  t.start()
173  time.sleep(1.0)
174  t.validate(self.msgs_equal)
175 
177  ts = []
178  for srv in ["TestResponseOnly", "TestEmpty", "TestRequestAndResponse", "TestRequestOnly",
179  "TestMultipleResponseFields", "TestMultipleRequestFields", "TestArrayRequest"]:
180  t = ServiceTester("/test_service_tester_alltypes_" + srv, "rosbridge_library/" + srv)
181  t.start()
182  ts.append(t)
183 
184  time.sleep(1.0)
185 
186  for t in ts:
187  t.validate(self.msgs_equal)
188 
190  common = ["roscpp/GetLoggers", "roscpp/SetLoggerLevel",
191  "std_srvs/Empty", "nav_msgs/GetMap", "nav_msgs/GetPlan",
192  "sensor_msgs/SetCameraInfo", "topic_tools/MuxAdd",
193  "topic_tools/MuxSelect", "tf2_msgs/FrameGraph",
194  "rospy_tutorials/BadTwoInts", "rospy_tutorials/AddTwoInts"]
195  ts = []
196  for srv in common:
197  t = ServiceTester("/test_random_service_types/" + srv, srv)
198  t.start()
199  ts.append(t)
200 
201  time.sleep(1.0)
202 
203  for t in ts:
204  t.validate(self.msgs_equal)
205 
206 
207 PKG = 'rosbridge_library'
208 NAME = 'test_services'
209 if __name__ == '__main__':
210  rostest.unitrun(PKG, NAME, TestServices)
211 
def validate(self, equality_function)
def __init__(self, name, srv_type)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14