2 from __future__
import print_function
14 from roscpp.srv
import GetLoggers
16 if sys.version_info >= (3, 0):
19 string_types = (str, unicode)
24 if isinstance(d, dict):
28 elif isinstance(d, str):
29 return str(random.random())
30 elif sys.version_info < (3,0)
and isinstance(d, unicode):
31 return unicode(random.random())
32 elif isinstance(d, bool):
34 elif isinstance(d, int):
35 return random.randint(100, 200)
36 elif isinstance(d, float):
46 self.
srvClass = ros_loader.get_service_class(srv_type)
50 req = self.srvClass._request_class()
51 gen = c.extract_values(req)
59 rsp = self.srvClass._response_class()
60 gen = c.extract_values(rsp)
63 rsp = c.populate_instance(gen, rsp)
65 print(
"populating instance")
67 print(
"populating with")
80 if hasattr(self,
"exc"):
82 print(self.exc.message)
84 equality_function(self.
input, c.extract_values(self.
req))
91 rospy.init_node(
"test_services")
94 if type(msg1)
in string_types
and type(msg2)
in string_types:
97 self.assertEqual(type(msg1), type(msg2))
98 if type(msg1)
in c.list_types:
99 for x, y
in zip(msg1, msg2):
101 elif type(msg1)
in c.primitive_types
or type(msg1)
in c.string_types:
102 self.assertEqual(msg1, msg2)
105 self.assertTrue(x
in msg2)
107 self.assertTrue(x
in msg1)
113 for srv_type
in [
"TestEmpty",
"TestResponseOnly"]:
114 cls = ros_loader.get_service_class(
"rosbridge_library/" + srv_type)
115 for args
in [[], {},
None]:
117 services.args_to_service_request_instance(
"", cls._request_class(), args)
120 for srv_type
in [
"TestRequestOnly",
"TestRequestAndResponse"]:
121 cls = ros_loader.get_service_class(
"rosbridge_library/" + srv_type)
122 for args
in [[3], {
"data": 3}]:
124 services.args_to_service_request_instance(
"", cls._request_class(), args)
125 self.assertRaises(FieldTypeMismatchException, services.args_to_service_request_instance,
"", cls._request_class(), [
"hello"])
128 cls = ros_loader.get_service_class(
"rosbridge_library/TestMultipleRequestFields")
129 for args
in [[3, 3.5,
"hello",
False], {
"int": 3,
"float": 3.5,
"string":
"hello",
"bool":
False}]:
131 services.args_to_service_request_instance(
"", cls._request_class(), args)
134 """ Test a simple getloggers service call """ 136 p = rospy.ServiceProxy(rospy.get_name() +
"/get_loggers", GetLoggers)
137 p.wait_for_service(0.5)
141 json_ret = services.call_service(rospy.get_name() +
"/get_loggers")
142 for x, y
in zip(ret.loggers, json_ret[
"loggers"]):
143 self.assertEqual(x.name, y[
"name"])
144 self.assertEqual(x.level, y[
"level"])
147 """ Same as test_service_call but via the thread caller """ 149 p = rospy.ServiceProxy(rospy.get_name() +
"/get_loggers", GetLoggers)
150 p.wait_for_service(0.5)
153 rcvd = {
"json":
None}
162 services.ServiceCaller(rospy.get_name() +
"/get_loggers",
None, success, error).start()
166 for x, y
in zip(ret.loggers, rcvd[
"json"][
"loggers"]):
167 self.assertEqual(x.name, y[
"name"])
168 self.assertEqual(x.level, y[
"level"])
171 t =
ServiceTester(
"/test_service_tester",
"rosbridge_library/TestRequestAndResponse")
178 for srv
in [
"TestResponseOnly",
"TestEmpty",
"TestRequestAndResponse",
"TestRequestOnly",
179 "TestMultipleResponseFields",
"TestMultipleRequestFields",
"TestArrayRequest"]:
180 t =
ServiceTester(
"/test_service_tester_alltypes_" + srv,
"rosbridge_library/" + srv)
190 common = [
"roscpp/GetLoggers",
"roscpp/SetLoggerLevel",
191 "std_srvs/Empty",
"nav_msgs/GetMap",
"nav_msgs/GetPlan",
192 "sensor_msgs/SetCameraInfo",
"topic_tools/MuxAdd",
193 "topic_tools/MuxSelect",
"tf2_msgs/FrameGraph",
194 "rospy_tutorials/BadTwoInts",
"rospy_tutorials/AddTwoInts"]
207 PKG =
'rosbridge_library' 208 NAME =
'test_services' 209 if __name__ ==
'__main__':
210 rostest.unitrun(PKG, NAME, TestServices)
def validate(self, equality_function)
def test_random_service_types(self)
def test_service_tester_alltypes(self)
def test_service_call(self)
def populate_random_args(d)
def __init__(self, name, srv_type)
def test_service_tester(self)
def test_populate_request_args(self)
def msgs_equal(self, msg1, msg2)
def test_service_caller(self)