2 from __future__
import print_function
14 from roscpp.srv
import GetLoggers
16 if sys.version_info >= (3, 0):
19 string_types = (str, unicode)
22 PKG =
'rosbridge_library' 23 NAME =
'test_services' 28 if isinstance(d, dict):
32 elif isinstance(d, str):
33 return str(random.random())
34 elif sys.version_info < (3,0)
and isinstance(d, unicode):
35 return unicode(random.random())
36 elif isinstance(d, bool):
38 elif isinstance(d, int):
39 return random.randint(100, 200)
40 elif isinstance(d, float):
50 self.
srvClass = ros_loader.get_service_class(srv_type)
55 gen = c.extract_values(req)
63 rsp = self.
srvClass._response_class()
64 gen = c.extract_values(rsp)
67 rsp = c.populate_instance(gen, rsp)
69 print(
"populating instance")
71 print(
"populating with")
84 if hasattr(self,
"exc"):
86 print(self.
exc.message)
88 equality_function(self.
input, c.extract_values(self.
req))
98 if type(msg1)
in string_types
and type(msg2)
in string_types:
101 self.assertEqual(type(msg1), type(msg2))
102 if type(msg1)
in c.list_types:
103 for x, y
in zip(msg1, msg2):
105 elif type(msg1)
in c.primitive_types
or type(msg1)
in c.string_types:
106 self.assertEqual(msg1, msg2)
109 self.assertTrue(x
in msg2)
111 self.assertTrue(x
in msg1)
117 for srv_type
in [
"TestEmpty",
"TestResponseOnly"]:
118 cls = ros_loader.get_service_class(
"rosbridge_library/" + srv_type)
119 for args
in [[], {},
None]:
121 services.args_to_service_request_instance(
"", cls._request_class(), args)
124 for srv_type
in [
"TestRequestOnly",
"TestRequestAndResponse"]:
125 cls = ros_loader.get_service_class(
"rosbridge_library/" + srv_type)
126 for args
in [[3], {
"data": 3}]:
128 services.args_to_service_request_instance(
"", cls._request_class(), args)
129 self.assertRaises(FieldTypeMismatchException, services.args_to_service_request_instance,
"", cls._request_class(), [
"hello"])
132 cls = ros_loader.get_service_class(
"rosbridge_library/TestMultipleRequestFields")
133 for args
in [[3, 3.5,
"hello",
False], {
"int": 3,
"float": 3.5,
"string":
"hello",
"bool":
False}]:
135 services.args_to_service_request_instance(
"", cls._request_class(), args)
138 """ Test a simple getloggers service call """ 140 p = rospy.ServiceProxy(rospy.get_name() +
"/get_loggers", GetLoggers)
141 p.wait_for_service(0.5)
145 json_ret = services.call_service(rospy.get_name() +
"/get_loggers")
146 for x, y
in zip(ret.loggers, json_ret[
"loggers"]):
147 self.assertEqual(x.name, y[
"name"])
148 self.assertEqual(x.level, y[
"level"])
151 """ Same as test_service_call but via the thread caller """ 153 p = rospy.ServiceProxy(rospy.get_name() +
"/get_loggers", GetLoggers)
154 p.wait_for_service(0.5)
157 rcvd = {
"json":
None}
166 services.ServiceCaller(rospy.get_name() +
"/get_loggers",
None, success, error).start()
170 for x, y
in zip(ret.loggers, rcvd[
"json"][
"loggers"]):
171 self.assertEqual(x.name, y[
"name"])
172 self.assertEqual(x.level, y[
"level"])
175 t =
ServiceTester(
"/test_service_tester",
"rosbridge_library/TestRequestAndResponse")
182 for srv
in [
"TestResponseOnly",
"TestEmpty",
"TestRequestAndResponse",
"TestRequestOnly",
183 "TestMultipleResponseFields",
"TestMultipleRequestFields",
"TestArrayRequest"]:
184 t =
ServiceTester(
"/test_service_tester_alltypes_" + srv,
"rosbridge_library/" + srv)
194 common = [
"roscpp/GetLoggers",
"roscpp/SetLoggerLevel",
195 "std_srvs/Empty",
"nav_msgs/GetMap",
"nav_msgs/GetPlan",
196 "sensor_msgs/SetCameraInfo",
"topic_tools/MuxAdd",
197 "topic_tools/MuxSelect",
"tf2_msgs/FrameGraph",
198 "rospy_tutorials/BadTwoInts",
"rospy_tutorials/AddTwoInts"]
211 if __name__ ==
'__main__':
212 rosunit.unitrun(PKG, NAME, TestServices)
def validate(self, equality_function)
def test_random_service_types(self)
def test_service_tester_alltypes(self)
def test_service_call(self)
def populate_random_args(d)
def __init__(self, name, srv_type)
def test_service_tester(self)
def test_populate_request_args(self)
def msgs_equal(self, msg1, msg2)
def test_service_caller(self)