00001
00002 PKG = 'rosbridge_library'
00003 import roslib; roslib.load_manifest(PKG)
00004 import rospy
00005
00006 import time
00007 import random
00008
00009 from rosbridge_library.internal import services, ros_loader
00010 from rosbridge_library.internal import message_conversion as c
00011 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00012
00013 roslib.load_manifest("roscpp")
00014 from roscpp.srv import GetLoggers
00015
00016 import unittest
00017
00018
00019 def populate_random_args(d):
00020
00021 if isinstance(d, dict):
00022 for x in d:
00023 d[x] = populate_random_args(d[x])
00024 return d
00025 elif isinstance(d, str):
00026 return str(random.random())
00027 elif isinstance(d, unicode):
00028 return unicode(random.random())
00029 elif isinstance(d, bool):
00030 return True
00031 elif isinstance(d, int):
00032 return random.randint(100, 200)
00033 elif isinstance(d, float):
00034 return 3.5
00035 else:
00036 return d
00037
00038
00039 class ServiceTester:
00040
00041 def __init__(self, name, srv_type):
00042 self.name = name
00043 self.srvClass = ros_loader.get_service_class(srv_type)
00044 self.service = rospy.Service(name, self.srvClass, self.callback)
00045
00046 def start(self):
00047 req = self.srvClass._request_class()
00048 gen = c.extract_values(req)
00049 gen = populate_random_args(gen)
00050 self.input = gen
00051 services.ServiceCaller(self.name, gen, self.success, self.error).start()
00052
00053 def callback(self, req):
00054 self.req = req
00055 time.sleep(0.5)
00056 rsp = self.srvClass._response_class()
00057 gen = c.extract_values(rsp)
00058 gen = populate_random_args(gen)
00059 try:
00060 rsp = c.populate_instance(gen, rsp)
00061 except:
00062 print "populating instance"
00063 print rsp
00064 print "populating with"
00065 print gen
00066 raise
00067 self.output = gen
00068 return rsp
00069
00070 def success(self, rsp):
00071 self.rsp = rsp
00072
00073 def error(self, exc):
00074 self.exc = exc
00075
00076 def validate(self, equality_function):
00077 if hasattr(self, "exc"):
00078 print self.exc
00079 print self.exc.message
00080 raise self.exc
00081 equality_function(self.input, c.extract_values(self.req))
00082 equality_function(self.output, self.rsp)
00083
00084
00085 class TestServices(unittest.TestCase):
00086
00087 def setUp(self):
00088 rospy.init_node("test_services")
00089
00090 def msgs_equal(self, msg1, msg2):
00091 if type(msg1) in [str, unicode] and type(msg2) in [str, unicode]:
00092 pass
00093 else:
00094 self.assertEqual(type(msg1), type(msg2))
00095 if type(msg1) in c.list_types:
00096 for x, y in zip(msg1, msg2):
00097 self.msgs_equal(x, y)
00098 elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
00099 self.assertEqual(msg1, msg2)
00100 else:
00101 for x in msg1:
00102 self.assertTrue(x in msg2)
00103 for x in msg2:
00104 self.assertTrue(x in msg1)
00105 for x in msg1:
00106 self.msgs_equal(msg1[x], msg2[x])
00107
00108 def test_populate_request_args(self):
00109
00110 for srv_type in ["Empty", "ResponseOnly"]:
00111 cls = ros_loader.get_service_class("rosbridge_test/" + srv_type)
00112 for args in [[], {}, None]:
00113
00114 services.args_to_service_request_instance("", cls._request_class(), args)
00115
00116
00117 for srv_type in ["RequestOnly", "RequestAndResponse"]:
00118 cls = ros_loader.get_service_class("rosbridge_test/" + srv_type)
00119 for args in [[3], {"data": 3}]:
00120
00121 services.args_to_service_request_instance("", cls._request_class(), args)
00122 self.assertRaises(FieldTypeMismatchException, services.args_to_service_request_instance, "", cls._request_class(), ["hello"])
00123
00124
00125 cls = ros_loader.get_service_class("rosbridge_test/MultipleRequestFields")
00126 for args in [[3, 3.5, "hello", False], {"int": 3, "float": 3.5, "string": "hello", "bool": False}]:
00127
00128 services.args_to_service_request_instance("", cls._request_class(), args)
00129
00130 def test_service_call(self):
00131 """ Test a simple getloggers service call """
00132
00133 p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00134 ret = p()
00135
00136
00137 json_ret = services.call_service("/rosout/get_loggers")
00138 for x, y in zip(ret.loggers, json_ret["loggers"]):
00139 self.assertEqual(x.name, y["name"])
00140 self.assertEqual(x.level, y["level"])
00141
00142 def test_service_caller(self):
00143 """ Same as test_service_call but via the thread caller """
00144
00145 p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00146 ret = p()
00147
00148 rcvd = {"json": None}
00149
00150 def success(json):
00151 rcvd["json"] = json
00152
00153 def error():
00154 raise Exception()
00155
00156
00157 services.ServiceCaller("/rosout/get_loggers", None, success, error).start()
00158
00159 time.sleep(0.5)
00160
00161 for x, y in zip(ret.loggers, rcvd["json"]["loggers"]):
00162 self.assertEqual(x.name, y["name"])
00163 self.assertEqual(x.level, y["level"])
00164
00165 def test_service_tester(self):
00166 t = ServiceTester("/test_service_tester", "rosbridge_test/RequestAndResponse")
00167 t.start()
00168 time.sleep(1.0)
00169 t.validate(self.msgs_equal)
00170
00171 def test_service_tester_alltypes(self):
00172 ts = []
00173 for srv in ["ResponseOnly", "Empty", "RequestAndResponse", "RequestOnly",
00174 "MultipleResponseFields", "MultipleRequestFields", "ArrayRequest"]:
00175 t = ServiceTester("/test_service_tester_alltypes_" + srv, "rosbridge_test/" + srv)
00176 t.start()
00177 ts.append(t)
00178
00179 time.sleep(1.0)
00180
00181 for t in ts:
00182 t.validate(self.msgs_equal)
00183
00184 def test_random_service_types(self):
00185 common = ["roscpp/GetLoggers", "roscpp/SetLoggerLevel",
00186 "std_srvs/Empty", "nav_msgs/GetMap", "nav_msgs/GetPlan",
00187 "sensor_msgs/SetCameraInfo", "topic_tools/MuxAdd",
00188 "topic_tools/MuxSelect", "tf2_msgs/FrameGraph",
00189 "rospy_tutorials/BadTwoInts", "rospy_tutorials/AddTwoInts"]
00190 ts = []
00191 for srv in common:
00192 t = ServiceTester("/test_random_service_types/" + srv, srv)
00193 t.start()
00194 ts.append(t)
00195
00196 time.sleep(1.0)
00197
00198 for t in ts:
00199 t.validate(self.msgs_equal)
00200
00201
00202 if __name__ == '__main__':
00203 import rostest
00204 rostest.rosrun(PKG, 'test_services', TestServices)