test_services.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 from __future__ import print_function
00003 import sys
00004 import rospy
00005 import rostest
00006 import unittest
00007 import time
00008 import random
00009 
00010 from rosbridge_library.internal import services, ros_loader
00011 from rosbridge_library.internal import message_conversion as c
00012 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00013 
00014 from roscpp.srv import GetLoggers
00015 
00016 if sys.version_info >= (3, 0):
00017     string_types = (str,)
00018 else:
00019     string_types = (str, unicode)
00020 
00021 
00022 def populate_random_args(d):
00023     # Given a dictionary d, replaces primitives with random values
00024     if isinstance(d, dict):
00025         for x in d:
00026             d[x] = populate_random_args(d[x])
00027         return d
00028     elif isinstance(d, str):
00029         return str(random.random())
00030     elif sys.version_info < (3,0) and isinstance(d, unicode):
00031         return unicode(random.random())
00032     elif isinstance(d, bool):
00033         return True
00034     elif isinstance(d, int):
00035         return random.randint(100, 200)
00036     elif isinstance(d, float):
00037         return 3.5
00038     else:
00039         return d
00040 
00041 
00042 class ServiceTester:
00043 
00044     def __init__(self, name, srv_type):
00045         self.name = name
00046         self.srvClass = ros_loader.get_service_class(srv_type)
00047         self.service = rospy.Service(name, self.srvClass, self.callback)
00048 
00049     def start(self):
00050         req = self.srvClass._request_class()
00051         gen = c.extract_values(req)
00052         gen = populate_random_args(gen)
00053         self.input = gen
00054         services.ServiceCaller(self.name, gen, self.success, self.error).start()
00055 
00056     def callback(self, req):
00057         self.req = req
00058         time.sleep(0.5)
00059         rsp = self.srvClass._response_class()
00060         gen = c.extract_values(rsp)
00061         gen = populate_random_args(gen)
00062         try:
00063             rsp = c.populate_instance(gen, rsp)
00064         except:
00065             print("populating instance")
00066             print(rsp)
00067             print("populating with")
00068             print(gen)
00069             raise
00070         self.output = gen
00071         return rsp
00072 
00073     def success(self, rsp):
00074         self.rsp = rsp
00075 
00076     def error(self, exc):
00077         self.exc = exc
00078 
00079     def validate(self, equality_function):
00080         if hasattr(self, "exc"):
00081             print(self.exc)
00082             print(self.exc.message)
00083             raise self.exc
00084         equality_function(self.input, c.extract_values(self.req))
00085         equality_function(self.output, self.rsp)
00086 
00087 
00088 class TestServices(unittest.TestCase):
00089 
00090     def setUp(self):
00091         rospy.init_node("test_services")
00092 
00093     def msgs_equal(self, msg1, msg2):
00094         if type(msg1) in string_types and type(msg2) in string_types:
00095             pass
00096         else:
00097             self.assertEqual(type(msg1), type(msg2))
00098         if type(msg1) in c.list_types:
00099             for x, y in zip(msg1, msg2):
00100                 self.msgs_equal(x, y)
00101         elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
00102             self.assertEqual(msg1, msg2)
00103         else:
00104             for x in msg1:
00105                 self.assertTrue(x in msg2)
00106             for x in msg2:
00107                 self.assertTrue(x in msg1)
00108             for x in msg1:
00109                 self.msgs_equal(msg1[x], msg2[x])
00110 
00111     def test_populate_request_args(self):
00112         # Test empty messages
00113         for srv_type in ["TestEmpty", "TestResponseOnly"]:
00114             cls = ros_loader.get_service_class("rosbridge_library/" + srv_type)
00115             for args in [[], {}, None]:
00116                 # Should throw no exceptions
00117                 services.args_to_service_request_instance("", cls._request_class(), args)
00118 
00119         # Test msgs with data message
00120         for srv_type in ["TestRequestOnly", "TestRequestAndResponse"]:
00121             cls = ros_loader.get_service_class("rosbridge_library/" + srv_type)
00122             for args in [[3], {"data": 3}]:
00123                 # Should throw no exceptions
00124                 services.args_to_service_request_instance("", cls._request_class(), args)
00125             self.assertRaises(FieldTypeMismatchException, services.args_to_service_request_instance, "", cls._request_class(), ["hello"])
00126 
00127         # Test message with multiple fields
00128         cls = ros_loader.get_service_class("rosbridge_library/TestMultipleRequestFields")
00129         for args in [[3, 3.5, "hello", False], {"int": 3, "float": 3.5, "string": "hello", "bool": False}]:
00130             # Should throw no exceptions
00131             services.args_to_service_request_instance("", cls._request_class(), args)
00132 
00133     def test_service_call(self):
00134         """ Test a simple getloggers service call """
00135         # First, call the service the 'proper' way
00136         p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00137         ret = p()
00138 
00139         # Now, call using the services
00140         json_ret = services.call_service("/rosout/get_loggers")
00141         for x, y in zip(ret.loggers, json_ret["loggers"]):
00142             self.assertEqual(x.name, y["name"])
00143             self.assertEqual(x.level, y["level"])
00144 
00145     def test_service_caller(self):
00146         """ Same as test_service_call but via the thread caller """
00147         # First, call the service the 'proper' way
00148         p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00149         ret = p()
00150 
00151         rcvd = {"json": None}
00152 
00153         def success(json):
00154             rcvd["json"] = json
00155 
00156         def error():
00157             raise Exception()
00158 
00159         # Now, call using the services
00160         services.ServiceCaller("/rosout/get_loggers", None, success, error).start()
00161 
00162         time.sleep(0.5)
00163 
00164         for x, y in zip(ret.loggers, rcvd["json"]["loggers"]):
00165             self.assertEqual(x.name, y["name"])
00166             self.assertEqual(x.level, y["level"])
00167 
00168     def test_service_tester(self):
00169         t = ServiceTester("/test_service_tester", "rosbridge_library/TestRequestAndResponse")
00170         t.start()
00171         time.sleep(1.0)
00172         t.validate(self.msgs_equal)
00173 
00174     def test_service_tester_alltypes(self):
00175         ts = []
00176         for srv in ["TestResponseOnly", "TestEmpty", "TestRequestAndResponse", "TestRequestOnly",
00177             "TestMultipleResponseFields", "TestMultipleRequestFields", "TestArrayRequest"]:
00178             t = ServiceTester("/test_service_tester_alltypes_" + srv, "rosbridge_library/" + srv)
00179             t.start()
00180             ts.append(t)
00181 
00182         time.sleep(1.0)
00183 
00184         for t in ts:
00185             t.validate(self.msgs_equal)
00186 
00187     def test_random_service_types(self):
00188         common = ["roscpp/GetLoggers", "roscpp/SetLoggerLevel",
00189         "std_srvs/Empty", "nav_msgs/GetMap", "nav_msgs/GetPlan",
00190         "sensor_msgs/SetCameraInfo", "topic_tools/MuxAdd",
00191         "topic_tools/MuxSelect", "tf2_msgs/FrameGraph",
00192         "rospy_tutorials/BadTwoInts", "rospy_tutorials/AddTwoInts"]
00193         ts = []
00194         for srv in common:
00195             t = ServiceTester("/test_random_service_types/" + srv, srv)
00196             t.start()
00197             ts.append(t)
00198 
00199         time.sleep(1.0)
00200 
00201         for t in ts:
00202             t.validate(self.msgs_equal)
00203 
00204 
00205 PKG = 'rosbridge_library'
00206 NAME = 'test_services'
00207 if __name__ == '__main__':
00208     rostest.unitrun(PKG, NAME, TestServices)
00209 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:43