00001
00002 from __future__ import print_function
00003 import sys
00004 import rospy
00005 import rostest
00006 import unittest
00007 import time
00008 import random
00009
00010 from rosbridge_library.internal import services, ros_loader
00011 from rosbridge_library.internal import message_conversion as c
00012 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00013
00014 from roscpp.srv import GetLoggers
00015
00016 if sys.version_info >= (3, 0):
00017 string_types = (str,)
00018 else:
00019 string_types = (str, unicode)
00020
00021
00022 def populate_random_args(d):
00023
00024 if isinstance(d, dict):
00025 for x in d:
00026 d[x] = populate_random_args(d[x])
00027 return d
00028 elif isinstance(d, str):
00029 return str(random.random())
00030 elif sys.version_info < (3,0) and isinstance(d, unicode):
00031 return unicode(random.random())
00032 elif isinstance(d, bool):
00033 return True
00034 elif isinstance(d, int):
00035 return random.randint(100, 200)
00036 elif isinstance(d, float):
00037 return 3.5
00038 else:
00039 return d
00040
00041
00042 class ServiceTester:
00043
00044 def __init__(self, name, srv_type):
00045 self.name = name
00046 self.srvClass = ros_loader.get_service_class(srv_type)
00047 self.service = rospy.Service(name, self.srvClass, self.callback)
00048
00049 def start(self):
00050 req = self.srvClass._request_class()
00051 gen = c.extract_values(req)
00052 gen = populate_random_args(gen)
00053 self.input = gen
00054 services.ServiceCaller(self.name, gen, self.success, self.error).start()
00055
00056 def callback(self, req):
00057 self.req = req
00058 time.sleep(0.5)
00059 rsp = self.srvClass._response_class()
00060 gen = c.extract_values(rsp)
00061 gen = populate_random_args(gen)
00062 try:
00063 rsp = c.populate_instance(gen, rsp)
00064 except:
00065 print("populating instance")
00066 print(rsp)
00067 print("populating with")
00068 print(gen)
00069 raise
00070 self.output = gen
00071 return rsp
00072
00073 def success(self, rsp):
00074 self.rsp = rsp
00075
00076 def error(self, exc):
00077 self.exc = exc
00078
00079 def validate(self, equality_function):
00080 if hasattr(self, "exc"):
00081 print(self.exc)
00082 print(self.exc.message)
00083 raise self.exc
00084 equality_function(self.input, c.extract_values(self.req))
00085 equality_function(self.output, self.rsp)
00086
00087
00088 class TestServices(unittest.TestCase):
00089
00090 def setUp(self):
00091 rospy.init_node("test_services")
00092
00093 def msgs_equal(self, msg1, msg2):
00094 if type(msg1) in string_types and type(msg2) in string_types:
00095 pass
00096 else:
00097 self.assertEqual(type(msg1), type(msg2))
00098 if type(msg1) in c.list_types:
00099 for x, y in zip(msg1, msg2):
00100 self.msgs_equal(x, y)
00101 elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
00102 self.assertEqual(msg1, msg2)
00103 else:
00104 for x in msg1:
00105 self.assertTrue(x in msg2)
00106 for x in msg2:
00107 self.assertTrue(x in msg1)
00108 for x in msg1:
00109 self.msgs_equal(msg1[x], msg2[x])
00110
00111 def test_populate_request_args(self):
00112
00113 for srv_type in ["TestEmpty", "TestResponseOnly"]:
00114 cls = ros_loader.get_service_class("rosbridge_library/" + srv_type)
00115 for args in [[], {}, None]:
00116
00117 services.args_to_service_request_instance("", cls._request_class(), args)
00118
00119
00120 for srv_type in ["TestRequestOnly", "TestRequestAndResponse"]:
00121 cls = ros_loader.get_service_class("rosbridge_library/" + srv_type)
00122 for args in [[3], {"data": 3}]:
00123
00124 services.args_to_service_request_instance("", cls._request_class(), args)
00125 self.assertRaises(FieldTypeMismatchException, services.args_to_service_request_instance, "", cls._request_class(), ["hello"])
00126
00127
00128 cls = ros_loader.get_service_class("rosbridge_library/TestMultipleRequestFields")
00129 for args in [[3, 3.5, "hello", False], {"int": 3, "float": 3.5, "string": "hello", "bool": False}]:
00130
00131 services.args_to_service_request_instance("", cls._request_class(), args)
00132
00133 def test_service_call(self):
00134 """ Test a simple getloggers service call """
00135
00136 p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00137 ret = p()
00138
00139
00140 json_ret = services.call_service("/rosout/get_loggers")
00141 for x, y in zip(ret.loggers, json_ret["loggers"]):
00142 self.assertEqual(x.name, y["name"])
00143 self.assertEqual(x.level, y["level"])
00144
00145 def test_service_caller(self):
00146 """ Same as test_service_call but via the thread caller """
00147
00148 p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00149 ret = p()
00150
00151 rcvd = {"json": None}
00152
00153 def success(json):
00154 rcvd["json"] = json
00155
00156 def error():
00157 raise Exception()
00158
00159
00160 services.ServiceCaller("/rosout/get_loggers", None, success, error).start()
00161
00162 time.sleep(0.5)
00163
00164 for x, y in zip(ret.loggers, rcvd["json"]["loggers"]):
00165 self.assertEqual(x.name, y["name"])
00166 self.assertEqual(x.level, y["level"])
00167
00168 def test_service_tester(self):
00169 t = ServiceTester("/test_service_tester", "rosbridge_library/TestRequestAndResponse")
00170 t.start()
00171 time.sleep(1.0)
00172 t.validate(self.msgs_equal)
00173
00174 def test_service_tester_alltypes(self):
00175 ts = []
00176 for srv in ["TestResponseOnly", "TestEmpty", "TestRequestAndResponse", "TestRequestOnly",
00177 "TestMultipleResponseFields", "TestMultipleRequestFields", "TestArrayRequest"]:
00178 t = ServiceTester("/test_service_tester_alltypes_" + srv, "rosbridge_library/" + srv)
00179 t.start()
00180 ts.append(t)
00181
00182 time.sleep(1.0)
00183
00184 for t in ts:
00185 t.validate(self.msgs_equal)
00186
00187 def test_random_service_types(self):
00188 common = ["roscpp/GetLoggers", "roscpp/SetLoggerLevel",
00189 "std_srvs/Empty", "nav_msgs/GetMap", "nav_msgs/GetPlan",
00190 "sensor_msgs/SetCameraInfo", "topic_tools/MuxAdd",
00191 "topic_tools/MuxSelect", "tf2_msgs/FrameGraph",
00192 "rospy_tutorials/BadTwoInts", "rospy_tutorials/AddTwoInts"]
00193 ts = []
00194 for srv in common:
00195 t = ServiceTester("/test_random_service_types/" + srv, srv)
00196 t.start()
00197 ts.append(t)
00198
00199 time.sleep(1.0)
00200
00201 for t in ts:
00202 t.validate(self.msgs_equal)
00203
00204
00205 PKG = 'rosbridge_library'
00206 NAME = 'test_services'
00207 if __name__ == '__main__':
00208 rostest.unitrun(PKG, NAME, TestServices)
00209