00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 import time
00007 import random
00008
00009 from rosbridge_library.internal import services, ros_loader
00010 from rosbridge_library.internal import message_conversion as c
00011 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00012
00013 from roscpp.srv import GetLoggers
00014
00015
00016 def populate_random_args(d):
00017
00018 if isinstance(d, dict):
00019 for x in d:
00020 d[x] = populate_random_args(d[x])
00021 return d
00022 elif isinstance(d, str):
00023 return str(random.random())
00024 elif isinstance(d, unicode):
00025 return unicode(random.random())
00026 elif isinstance(d, bool):
00027 return True
00028 elif isinstance(d, int):
00029 return random.randint(100, 200)
00030 elif isinstance(d, float):
00031 return 3.5
00032 else:
00033 return d
00034
00035
00036 class ServiceTester:
00037
00038 def __init__(self, name, srv_type):
00039 self.name = name
00040 self.srvClass = ros_loader.get_service_class(srv_type)
00041 self.service = rospy.Service(name, self.srvClass, self.callback)
00042
00043 def start(self):
00044 req = self.srvClass._request_class()
00045 gen = c.extract_values(req)
00046 gen = populate_random_args(gen)
00047 self.input = gen
00048 services.ServiceCaller(self.name, gen, self.success, self.error).start()
00049
00050 def callback(self, req):
00051 self.req = req
00052 time.sleep(0.5)
00053 rsp = self.srvClass._response_class()
00054 gen = c.extract_values(rsp)
00055 gen = populate_random_args(gen)
00056 try:
00057 rsp = c.populate_instance(gen, rsp)
00058 except:
00059 print "populating instance"
00060 print rsp
00061 print "populating with"
00062 print gen
00063 raise
00064 self.output = gen
00065 return rsp
00066
00067 def success(self, rsp):
00068 self.rsp = rsp
00069
00070 def error(self, exc):
00071 self.exc = exc
00072
00073 def validate(self, equality_function):
00074 if hasattr(self, "exc"):
00075 print self.exc
00076 print self.exc.message
00077 raise self.exc
00078 equality_function(self.input, c.extract_values(self.req))
00079 equality_function(self.output, self.rsp)
00080
00081
00082 class TestServices(unittest.TestCase):
00083
00084 def setUp(self):
00085 rospy.init_node("test_services")
00086
00087 def msgs_equal(self, msg1, msg2):
00088 if type(msg1) in [str, unicode] and type(msg2) in [str, unicode]:
00089 pass
00090 else:
00091 self.assertEqual(type(msg1), type(msg2))
00092 if type(msg1) in c.list_types:
00093 for x, y in zip(msg1, msg2):
00094 self.msgs_equal(x, y)
00095 elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
00096 self.assertEqual(msg1, msg2)
00097 else:
00098 for x in msg1:
00099 self.assertTrue(x in msg2)
00100 for x in msg2:
00101 self.assertTrue(x in msg1)
00102 for x in msg1:
00103 self.msgs_equal(msg1[x], msg2[x])
00104
00105 def test_populate_request_args(self):
00106
00107 for srv_type in ["TestEmpty", "TestResponseOnly"]:
00108 cls = ros_loader.get_service_class("rosbridge_library/" + srv_type)
00109 for args in [[], {}, None]:
00110
00111 services.args_to_service_request_instance("", cls._request_class(), args)
00112
00113
00114 for srv_type in ["TestRequestOnly", "TestRequestAndResponse"]:
00115 cls = ros_loader.get_service_class("rosbridge_library/" + srv_type)
00116 for args in [[3], {"data": 3}]:
00117
00118 services.args_to_service_request_instance("", cls._request_class(), args)
00119 self.assertRaises(FieldTypeMismatchException, services.args_to_service_request_instance, "", cls._request_class(), ["hello"])
00120
00121
00122 cls = ros_loader.get_service_class("rosbridge_library/TestMultipleRequestFields")
00123 for args in [[3, 3.5, "hello", False], {"int": 3, "float": 3.5, "string": "hello", "bool": False}]:
00124
00125 services.args_to_service_request_instance("", cls._request_class(), args)
00126
00127 def test_service_call(self):
00128 """ Test a simple getloggers service call """
00129
00130 p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00131 ret = p()
00132
00133
00134 json_ret = services.call_service("/rosout/get_loggers")
00135 for x, y in zip(ret.loggers, json_ret["loggers"]):
00136 self.assertEqual(x.name, y["name"])
00137 self.assertEqual(x.level, y["level"])
00138
00139 def test_service_caller(self):
00140 """ Same as test_service_call but via the thread caller """
00141
00142 p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00143 ret = p()
00144
00145 rcvd = {"json": None}
00146
00147 def success(json):
00148 rcvd["json"] = json
00149
00150 def error():
00151 raise Exception()
00152
00153
00154 services.ServiceCaller("/rosout/get_loggers", None, success, error).start()
00155
00156 time.sleep(0.5)
00157
00158 for x, y in zip(ret.loggers, rcvd["json"]["loggers"]):
00159 self.assertEqual(x.name, y["name"])
00160 self.assertEqual(x.level, y["level"])
00161
00162 def test_service_tester(self):
00163 t = ServiceTester("/test_service_tester", "rosbridge_library/TestRequestAndResponse")
00164 t.start()
00165 time.sleep(1.0)
00166 t.validate(self.msgs_equal)
00167
00168 def test_service_tester_alltypes(self):
00169 ts = []
00170 for srv in ["TestResponseOnly", "TestEmpty", "TestRequestAndResponse", "TestRequestOnly",
00171 "TestMultipleResponseFields", "TestMultipleRequestFields", "TestArrayRequest"]:
00172 t = ServiceTester("/test_service_tester_alltypes_" + srv, "rosbridge_library/" + srv)
00173 t.start()
00174 ts.append(t)
00175
00176 time.sleep(1.0)
00177
00178 for t in ts:
00179 t.validate(self.msgs_equal)
00180
00181 def test_random_service_types(self):
00182 common = ["roscpp/GetLoggers", "roscpp/SetLoggerLevel",
00183 "std_srvs/Empty", "nav_msgs/GetMap", "nav_msgs/GetPlan",
00184 "sensor_msgs/SetCameraInfo", "topic_tools/MuxAdd",
00185 "topic_tools/MuxSelect", "tf2_msgs/FrameGraph",
00186 "rospy_tutorials/BadTwoInts", "rospy_tutorials/AddTwoInts"]
00187 ts = []
00188 for srv in common:
00189 t = ServiceTester("/test_random_service_types/" + srv, srv)
00190 t.start()
00191 ts.append(t)
00192
00193 time.sleep(1.0)
00194
00195 for t in ts:
00196 t.validate(self.msgs_equal)
00197
00198
00199 PKG = 'rosbridge_library'
00200 NAME = 'test_services'
00201 if __name__ == '__main__':
00202 rostest.unitrun(PKG, NAME, TestServices)
00203